Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/cron-job-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, realtime-index, append-ingestion]
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, realtime-index]
uses: ./.github/workflows/reusable-standard-its.yml
needs: build
with:
Expand All @@ -74,7 +74,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [ input-source, kafka-index, kafka-transactional-index, kafka-index-slow, kafka-transactional-index-slow, kafka-data-format, append-ingestion ]
testing_group: [ kafka-index, kafka-transactional-index, kafka-index-slow, kafka-transactional-index-slow, kafka-data-format ]
uses: ./.github/workflows/reusable-standard-its.yml
needs: build
with:
Expand Down
16 changes: 2 additions & 14 deletions .github/workflows/standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, realtime-index, append-ingestion, cds-task-schema-publish-disabled, cds-coordinator-metadata-query-disabled]
testing_group: [kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, realtime-index]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
Expand All @@ -63,7 +63,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [input-source, kafka-index, append-ingestion]
testing_group: [kafka-index]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
Expand Down Expand Up @@ -101,15 +101,3 @@ jobs:
mysql_driver: org.mariadb.jdbc.Driver
override_config_path: ./environment-configs/test-groups/prepopulated-data
group: query

integration-custom-coordinator-duties-tests:
needs: changes
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
build_jdk: 17
runtime_jdk: 17
testing_groups: -Dgroups=custom-coordinator-duties
use_indexer: middleManager
override_config_path: ./environment-configs/test-groups/custom-coordinator-duties
group: custom coordinator duties
15 changes: 2 additions & 13 deletions docs/development/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,19 +326,8 @@ The duties will be grouped into multiple groups as per the elements in list `dru
All duties in the same group will have the same run period configured by `druid.coordinator.<GROUP_NAME>.period`.
Currently, there is a single thread running the duties sequentially for each group.

For example, see `KillSupervisorsCustomDuty` for a custom coordinator duty implementation and the `custom-coordinator-duties`
integration test group which loads `KillSupervisorsCustomDuty` using the configs set in `integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties`.
This config file adds the configs below to enable a custom coordinator duty.

```properties
druid.coordinator.dutyGroups=["cleanupMetadata"]
druid.coordinator.cleanupMetadata.duties=["killSupervisors"]
druid.coordinator.cleanupMetadata.duty.killSupervisors.durationToRetain=PT0M
druid.coordinator.cleanupMetadata.period=PT10S
```

These configurations create a custom coordinator duty group called `cleanupMetadata` which runs a custom coordinator duty called `killSupervisors` every 10 seconds.
The custom coordinator duty `killSupervisors` also has a config called `durationToRetain` which is set to 0 minute.
For example, see `KillSupervisorsCustomDuty` for a custom coordinator duty implementation and the `KillSupervisorsCustomDutyTest`
for sample properties that may be used to configure the `KillSupervisorsCustomDuty`.

### Routing data through a HTTP proxy for your extension

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
import org.apache.druid.msq.guice.MSQDurableStorageModule;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.guice.MSQSqlModule;
import org.apache.druid.msq.guice.SqlTaskModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
Expand Down Expand Up @@ -90,15 +85,7 @@ public EmbeddedDruidCluster createCluster()
"[\"org.apache.druid.query.policy.NoRestrictionPolicy\"]"
)
.addCommonProperty("druid.policy.enforcer.type", "restrictAllTables")
.addExtensions(
CatalogClientModule.class,
CatalogCoordinatorModule.class,
IndexerMemoryManagementModule.class,
MSQDurableStorageModule.class,
MSQIndexingModule.class,
MSQSqlModule.class,
SqlTaskModule.class
)
.addExtensions(CatalogClientModule.class, CatalogCoordinatorModule.class)
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,30 +129,7 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
{
final boolean isRollup = partitionsSpec.isForceGuaranteedRollupCompatible();

final TaskBuilder.IndexParallel indexTask =
TaskBuilder.ofTypeIndexParallel()
.dataSource(dataSource)
.timestampColumn("timestamp")
.jsonInputFormat()
.localInputSourceWithFiles(
Resources.DataFile.tinyWiki1Json(),
Resources.DataFile.tinyWiki2Json(),
Resources.DataFile.tinyWiki3Json()
)
.segmentGranularity("DAY")
.dimensions("namespace", "page", "language")
.metricAggregates(
new DoubleSumAggregatorFactory("added", "added"),
new DoubleSumAggregatorFactory("deleted", "deleted"),
new DoubleSumAggregatorFactory("delta", "delta"),
new CountAggregatorFactory("count")
)
.tuningConfig(
t -> t.withPartitionsSpec(partitionsSpec)
.withForceGuaranteedRollup(isRollup)
.withMaxNumConcurrentSubTasks(10)
.withSplitHintSpec(new MaxSizeSplitHintSpec(1, null))
);
final TaskBuilder.IndexParallel indexTask = buildIndexParallelTask(partitionsSpec, false);

runTask(indexTask, dataSource);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Expand Down Expand Up @@ -211,6 +188,60 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
runQueries(dataSource3);
}

@MethodSource("getTestParamPartitionsSpec")
@ParameterizedTest(name = "partitionsSpec={0}")
public void test_runIndexTask_andAppendData(PartitionsSpec partitionsSpec)
{
final TaskBuilder.IndexParallel initialTask = buildIndexParallelTask(partitionsSpec, false);
runTask(initialTask, dataSource);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
cluster.callApi().verifySqlQuery("SELECT COUNT(*) FROM %s", dataSource, "10");
runGroupByQuery("Crimson Typhoon,1,905.0,9050.0");

final TaskBuilder.IndexParallel appendTask
= buildIndexParallelTask(new DynamicPartitionsSpec(null, null), true);
runTask(appendTask, dataSource);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
cluster.callApi().verifySqlQuery("SELECT COUNT(*) FROM %s", dataSource, "20");
runGroupByQuery("Crimson Typhoon,2,1810.0,18100.0");
}

/**
* Creates a builder for an "index_parallel" task to ingest into {@link #dataSource}.
*/
private TaskBuilder.IndexParallel buildIndexParallelTask(
PartitionsSpec partitionsSpec,
boolean appendToExisting
)
{
final boolean isRollup = partitionsSpec.isForceGuaranteedRollupCompatible();

return TaskBuilder.ofTypeIndexParallel()
.dataSource(dataSource)
.timestampColumn("timestamp")
.jsonInputFormat()
.localInputSourceWithFiles(
Resources.DataFile.tinyWiki1Json(),
Resources.DataFile.tinyWiki2Json(),
Resources.DataFile.tinyWiki3Json()
)
.segmentGranularity("DAY")
.dimensions("namespace", "page", "language")
.metricAggregates(
new DoubleSumAggregatorFactory("added", "added"),
new DoubleSumAggregatorFactory("deleted", "deleted"),
new DoubleSumAggregatorFactory("delta", "delta"),
new CountAggregatorFactory("count")
)
.appendToExisting(appendToExisting)
.tuningConfig(
t -> t.withPartitionsSpec(partitionsSpec)
.withForceGuaranteedRollup(isRollup)
.withMaxNumConcurrentSubTasks(10)
.withSplitHintSpec(new MaxSizeSplitHintSpec(1, null))
);
}

private String runTask(TaskBuilder.IndexParallel taskBuilder, String dataSource)
{
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
Expand All @@ -224,8 +255,13 @@ private void runQueries(String dataSource)
"10,2013-09-01T12:41:27.000Z,2013-08-31T01:02:33.000Z",
cluster.runSql("SELECT COUNT(*), MAX(__time), MIN(__time) FROM %s", dataSource)
);
runGroupByQuery("Crimson Typhoon,1,905.0,9050.0");
}

private void runGroupByQuery(String expectedResult)
{
Assertions.assertEquals(
"Crimson Typhoon,1,905.0,9050.0",
expectedResult,
cluster.runSql(
"SELECT \"page\", COUNT(*) AS \"rows\", SUM(\"added\"), 10 * SUM(\"added\") AS added_times_ten"
+ " FROM %s"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.server;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.ExceptionMatcher;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.List;

public class KillSupervisorsCustomDutyTest extends EmbeddedClusterTestBase
{
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator()
.addProperty("druid.coordinator.kill.supervisor.on", "false")
.addProperty("druid.coordinator.dutyGroups", "[\"cleanupMetadata\"]")
.addProperty("druid.coordinator.cleanupMetadata.duties", "[\"killSupervisors\"]")
.addProperty("druid.coordinator.cleanupMetadata.duty.killSupervisors.durationToRetain", "PT0M")
.addProperty("druid.coordinator.cleanupMetadata.period", "PT0.1S");

@Override
protected EmbeddedDruidCluster createCluster()
{
return EmbeddedDruidCluster
.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.addServer(coordinator)
.addServer(new EmbeddedOverlord())
.addServer(new EmbeddedBroker());
}

@Test
public void test_customDuty_removesHistoryOfTerminatedSupervisor()
{
// Create a compaction supervisor
final CompactionSupervisorSpec supervisor = new CompactionSupervisorSpec(
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(dataSource).build(),
false,
null
);
cluster.callApi().postSupervisor(supervisor);

// Verify that the history of the supervisor has 1 entry
final List<VersionedSupervisorSpec> history = getSupervisorHistory(supervisor.getId());
Assertions.assertEquals(1, history.size());

final SupervisorSpec supervisorEntry = history.get(0).getSpec();
Assertions.assertNotNull(supervisorEntry);
Assertions.assertEquals(List.of(dataSource), supervisorEntry.getDataSources());
Assertions.assertEquals(supervisor.getId(), supervisorEntry.getId());

// Terminate the supervisor
cluster.callApi().onLeaderOverlord(o -> o.terminateSupervisor(supervisor.getId()));

// Verify that the history now has 2 entries and the latest entry is a tombstone
final List<VersionedSupervisorSpec> historyAfterTermination = getSupervisorHistory(supervisor.getId());
Assertions.assertEquals(2, historyAfterTermination.size());
Assertions.assertInstanceOf(NoopSupervisorSpec.class, historyAfterTermination.get(0).getSpec());

// Wait until the cleanup metric has been emitted
coordinator.latchableEmitter().waitForEvent(
event -> event.hasMetricName("metadata/kill/supervisor/count")
.hasValueMatching(Matchers.greaterThanOrEqualTo(1L))
);

// Verify that the history now returns 404 Not Found
MatcherAssert.assertThat(
Assertions.assertThrows(
RuntimeException.class,
() -> getSupervisorHistory(supervisor.getId())
),
ExceptionMatcher.of(RuntimeException.class).expectRootCause(
ExceptionMatcher.of(HttpResponseException.class)
.expectMessageContains("404 Not Found")
.expectMessageContains(StringUtils.format("No history for [%s]", supervisor.getId()))
)
);
}

private List<VersionedSupervisorSpec> getSupervisorHistory(String supervisorId)
{
final String url = StringUtils.format(
"/druid/indexer/v1/supervisor/%s/history",
StringUtils.urlEncode(supervisorId)
);
return cluster.callApi().serviceClient().onLeaderOverlord(
mapper -> new RequestBuilder(HttpMethod.GET, url),
new TypeReference<>() {}
);
}
}
Loading
Loading