Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Seconds;
import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -59,6 +59,7 @@
* <p>
* Tests the autoscaler's ability to compute optimal task counts based on partition count and cost metrics (lag and idle time).
*/
@SuppressWarnings("resource")
public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
{
private static final String TOPIC = EmbeddedClusterApis.createTestDatasourceName();
Expand Down Expand Up @@ -95,18 +96,11 @@ public void stop()
}
};

// Increase worker capacity to handle more tasks
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
.addProperty("druid.worker.capacity", "60");

overlord.addProperty("druid.indexer.task.default.context", "{\"useConcurrentLocks\": true}")
.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced")
.addProperty("druid.manager.segments.pollDuration", "PT0.1s");

coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced");
.addProperty("druid.worker.capacity", "100");

cluster.useLatchableEmitter()
.useDefaultTimeoutForLatchableEmitter(120)
.useDefaultTimeoutForLatchableEmitter(60)
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
Expand Down Expand Up @@ -162,7 +156,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
}

@Test
@Timeout(125)
public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
{
final String superId = dataSource + "_super_scaleup";
Expand Down Expand Up @@ -215,6 +208,63 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}

@Test
void test_scaleDownDuringTaskRollover()
{
final String superId = dataSource + "_super";
final int initialTaskCount = 10;

final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig
.builder()
.enableTaskAutoScaler(true)
.taskCountMin(1)
.taskCountMax(10)
.taskCountStart(initialTaskCount)
.scaleActionPeriodMillis(2000)
.minTriggerScaleActionFrequencyMillis(2000)
// High idle weight ensures scale-down when tasks are mostly idle (little data to process)
.lagWeight(0.1)
.idleWeight(0.9)
.build();

final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount);

// Submit the supervisor
Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));

// Wait for at least one task running for the datasource managed by the supervisor.
overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.DATASOURCE, dataSource));

// Wait for autoscaler to emit metric indicating scale-down, it should be just less than the current task count.
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
.hasValueMatching(Matchers.lessThan((long) initialTaskCount)));

// Wait for tasks to complete (first rollover)
overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/action/success/count"));

// Wait for the task running for the datasource managed by a supervisor.
overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.DATASOURCE, dataSource));

// After rollover, verify that the running task count has decreased
// The autoscaler should have recommended fewer tasks due to high idle time
final int postRolloverRunningTasks = cluster.callApi().getTaskCount("running", dataSource);

Assertions.assertTrue(
postRolloverRunningTasks < initialTaskCount,
StringUtils.format(
"Expected running task count to decrease after rollover. Initial: %d, After rollover: %d",
initialTaskCount,
postRolloverRunningTasks
)
);

// Suspend the supervisor to clean up
cluster.callApi().postSupervisor(spec.createSuspendedSpec());
}

private void produceRecordsToKafka(int recordCount, int iterations)
{
int recordCountPerSlice = recordCount / iterations;
Expand Down Expand Up @@ -258,7 +308,7 @@ private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler(
ioConfig -> ioConfig
.withConsumerProperties(kafkaServer.consumerProperties())
.withTaskCount(taskCount)
.withTaskDuration(Seconds.THREE.toPeriod())
.withTaskDuration(Period.seconds(7))
.withAutoScalerConfig(autoScalerConfig)
)
.withId(supervisorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
SupervisorTaskAutoScaler autoscaler;
try {
supervisor = spec.createSupervisor();
autoscaler = spec.createAutoscaler(supervisor);
autoscaler = supervisor.createAutoscaler(spec);

supervisor.start();
if (autoscaler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import org.apache.druid.indexing.overlord.supervisor.StreamSupervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
Expand Down Expand Up @@ -885,7 +887,7 @@ public String getType()

/**
* Tag for identifying this supervisor in thread-names, listeners, etc. tag = (type + supervisorId).
*/
*/
private final String supervisorTag;
private final TaskInfoProvider taskInfoProvider;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
Expand Down Expand Up @@ -926,6 +928,12 @@ public String getType()
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;

/**
* Reference to the autoscaler, used for rollover-based scale-down decisions.
* Wired by {@link SupervisorManager} after supervisor creation.
*/
private volatile SupervisorTaskAutoScaler taskAutoScaler;

// snapshots latest sequences from the stream to be verified in the next run cycle of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType> previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
Expand Down Expand Up @@ -1306,7 +1314,7 @@ public void tryInit()
if (log.isDebugEnabled()) {
log.debug(
"Handled notice[%s] from notices queue in [%d] ms, "
+ "current notices queue size [%d] for supervisor[%s] for datasource[%s].",
+ "current notices queue size [%d] for supervisor[%s] for datasource[%s].",
noticeType, noticeHandleTime.millisElapsed(), getNoticesQueueSize(), supervisorId, dataSource
);
}
Expand Down Expand Up @@ -1677,6 +1685,13 @@ private List<ParseExceptionReport> getCurrentParseErrors()
return limitedParseErrors;
}

@Override
public SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
{
this.taskAutoScaler = spec.createAutoscaler(this);
return this.taskAutoScaler;
}

@VisibleForTesting
public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
int taskGroupId,
Expand Down Expand Up @@ -3428,6 +3443,29 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
// remove this task group from the list of current task groups now that it has been handled
activelyReadingTaskGroups.remove(groupId);
}

maybeScaleDuringTaskRollover();
}

/**
* Scales up or down the number of tasks during a task rollover, if applicable.
* <p>
* This method is invoked to determine whether a task count adjustment is needed
* during a task rollover based on the recommendations from the task auto-scaler.
*/
@VisibleForTesting
void maybeScaleDuringTaskRollover()
{
if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
if (rolloverTaskCount > 0) {
log.info("Autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount);
changeTaskCountInIOConfig(rolloverTaskCount);
// Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call.
// This seems the best way to inject task amount recalculation during the rollover.
clearAllocationInfo();
}
}
}

private DateTime computeEarliestTaskStartTime(TaskGroup group)
Expand Down Expand Up @@ -4688,9 +4726,9 @@ protected void emitLag()

// Try emitting lag even with stale metrics provided that none of the partitions has negative lag
final long staleMillis = sequenceLastUpdated == null
? 0
: DateTimes.nowUtc().getMillis()
- (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis());
? 0
: DateTimes.nowUtc().getMillis()
- (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis());
if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x < 0)) {
// Log at most once every twenty supervisor runs to reduce noise in the logs
if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) {
Expand Down
Loading
Loading