diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index c5b86688ea1c..140c4626d220 100644
--- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -41,7 +41,7 @@
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-import org.joda.time.Seconds;
+import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -59,6 +59,7 @@
*
* Tests the autoscaler's ability to compute optimal task counts based on partition count and cost metrics (lag and idle time).
*/
+@SuppressWarnings("resource")
public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
{
private static final String TOPIC = EmbeddedClusterApis.createTestDatasourceName();
@@ -95,18 +96,11 @@ public void stop()
}
};
- // Increase worker capacity to handle more tasks
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
- .addProperty("druid.worker.capacity", "60");
-
- overlord.addProperty("druid.indexer.task.default.context", "{\"useConcurrentLocks\": true}")
- .addProperty("druid.manager.segments.useIncrementalCache", "ifSynced")
- .addProperty("druid.manager.segments.pollDuration", "PT0.1s");
-
- coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced");
+ .addProperty("druid.worker.capacity", "100");
cluster.useLatchableEmitter()
- .useDefaultTimeoutForLatchableEmitter(120)
+ .useDefaultTimeoutForLatchableEmitter(60)
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
@@ -162,7 +156,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
}
@Test
- @Timeout(125)
public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
{
final String superId = dataSource + "_super_scaleup";
@@ -215,6 +208,63 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
+ @Test
+ void test_scaleDownDuringTaskRollover()
+ {
+ final String superId = dataSource + "_super";
+ final int initialTaskCount = 10;
+
+ final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig
+ .builder()
+ .enableTaskAutoScaler(true)
+ .taskCountMin(1)
+ .taskCountMax(10)
+ .taskCountStart(initialTaskCount)
+ .scaleActionPeriodMillis(2000)
+ .minTriggerScaleActionFrequencyMillis(2000)
+ // High idle weight ensures scale-down when tasks are mostly idle (little data to process)
+ .lagWeight(0.1)
+ .idleWeight(0.9)
+ .build();
+
+ final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount);
+
+ // Submit the supervisor
+ Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
+
+ // Wait for at least one task running for the datasource managed by the supervisor.
+ overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource));
+
+ // Wait for autoscaler to emit metric indicating scale-down, it should be just less than the current task count.
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+ .hasValueMatching(Matchers.lessThan((long) initialTaskCount)));
+
+ // Wait for tasks to complete (first rollover)
+ overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/action/success/count"));
+
+ // Wait for the task running for the datasource managed by a supervisor.
+ overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource));
+
+ // After rollover, verify that the running task count has decreased
+ // The autoscaler should have recommended fewer tasks due to high idle time
+ final int postRolloverRunningTasks = cluster.callApi().getTaskCount("running", dataSource);
+
+ Assertions.assertTrue(
+ postRolloverRunningTasks < initialTaskCount,
+ StringUtils.format(
+ "Expected running task count to decrease after rollover. Initial: %d, After rollover: %d",
+ initialTaskCount,
+ postRolloverRunningTasks
+ )
+ );
+
+ // Suspend the supervisor to clean up
+ cluster.callApi().postSupervisor(spec.createSuspendedSpec());
+ }
+
private void produceRecordsToKafka(int recordCount, int iterations)
{
int recordCountPerSlice = recordCount / iterations;
@@ -258,7 +308,7 @@ private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler(
ioConfig -> ioConfig
.withConsumerProperties(kafkaServer.consumerProperties())
.withTaskCount(taskCount)
- .withTaskDuration(Seconds.THREE.toPeriod())
+ .withTaskDuration(Period.seconds(7))
.withAutoScalerConfig(autoScalerConfig)
)
.withId(supervisorId)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 21d2a6265011..0cd799bed547 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -502,7 +502,7 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
SupervisorTaskAutoScaler autoscaler;
try {
supervisor = spec.createSupervisor();
- autoscaler = spec.createAutoscaler(supervisor);
+ autoscaler = supervisor.createAutoscaler(spec);
supervisor.start();
if (autoscaler != null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 90adc68c799a..c5d3ffe873d5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -61,8 +61,10 @@
import org.apache.druid.indexing.overlord.supervisor.StreamSupervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -885,7 +887,7 @@ public String getType()
/**
* Tag for identifying this supervisor in thread-names, listeners, etc. tag = (type + supervisorId).
- */
+ */
private final String supervisorTag;
private final TaskInfoProvider taskInfoProvider;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
@@ -926,6 +928,12 @@ public String getType()
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;
+ /**
+ * Reference to the autoscaler, used for rollover-based scale-down decisions.
+ * Wired by {@link SupervisorManager} after supervisor creation.
+ */
+ private volatile SupervisorTaskAutoScaler taskAutoScaler;
+
// snapshots latest sequences from the stream to be verified in the next run cycle of inactive stream check
private final Map previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
@@ -1306,7 +1314,7 @@ public void tryInit()
if (log.isDebugEnabled()) {
log.debug(
"Handled notice[%s] from notices queue in [%d] ms, "
- + "current notices queue size [%d] for supervisor[%s] for datasource[%s].",
+ + "current notices queue size [%d] for supervisor[%s] for datasource[%s].",
noticeType, noticeHandleTime.millisElapsed(), getNoticesQueueSize(), supervisorId, dataSource
);
}
@@ -1677,6 +1685,13 @@ private List getCurrentParseErrors()
return limitedParseErrors;
}
+ @Override
+ public SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
+ {
+ this.taskAutoScaler = spec.createAutoscaler(this);
+ return this.taskAutoScaler;
+ }
+
@VisibleForTesting
public TaskGroup addTaskGroupToActivelyReadingTaskGroup(
int taskGroupId,
@@ -3428,6 +3443,29 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
// remove this task group from the list of current task groups now that it has been handled
activelyReadingTaskGroups.remove(groupId);
}
+
+ maybeScaleDuringTaskRollover();
+ }
+
+ /**
+ * Scales up or down the number of tasks during a task rollover, if applicable.
+ *
+ * This method is invoked to determine whether a task count adjustment is needed
+ * during a task rollover based on the recommendations from the task auto-scaler.
+ */
+ @VisibleForTesting
+ void maybeScaleDuringTaskRollover()
+ {
+ if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
+ int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
+ if (rolloverTaskCount > 0) {
+ log.info("Autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount);
+ changeTaskCountInIOConfig(rolloverTaskCount);
+ // Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call.
+ // This seems the best way to inject task amount recalculation during the rollover.
+ clearAllocationInfo();
+ }
+ }
}
private DateTime computeEarliestTaskStartTime(TaskGroup group)
@@ -4688,9 +4726,9 @@ protected void emitLag()
// Try emitting lag even with stale metrics provided that none of the partitions has negative lag
final long staleMillis = sequenceLastUpdated == null
- ? 0
- : DateTimes.nowUtc().getMillis()
- - (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis());
+ ? 0
+ : DateTimes.nowUtc().getMillis()
+ - (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis());
if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x < 0)) {
// Log at most once every twenty supervisor runs to reduce noise in the logs
if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 9ce08f936602..14e3ca2cf5e8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -33,10 +33,9 @@
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -58,8 +57,8 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
- public static final String AVG_LAG_METRIC = "task/autoScaler/costBased/avgLag";
- public static final String AVG_IDLE_METRIC = "task/autoScaler/costBased/pollIdleAvg";
+ public static final String LAG_COST_METRIC = "task/autoScaler/costBased/lagCost";
+ public static final String IDLE_COST_METRIC = "task/autoScaler/costBased/idleCost";
public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount";
private final String supervisorId;
@@ -70,6 +69,7 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
private final ServiceMetricEvent.Builder metricBuilder;
private final ScheduledExecutorService autoscalerExecutor;
private final WeightedCostFunction costFunction;
+ private volatile CostMetrics lastKnownMetrics;
public CostBasedAutoScaler(
SeekableStreamSupervisor supervisor,
@@ -86,7 +86,8 @@ public CostBasedAutoScaler(
this.costFunction = new WeightedCostFunction();
- this.autoscalerExecutor = Execs.scheduledSingleThreaded("CostBasedAutoScaler-" + StringUtils.encodeForFormat(spec.getId()));
+ this.autoscalerExecutor = Execs.scheduledSingleThreaded("CostBasedAutoScaler-"
+ + StringUtils.encodeForFormat(spec.getId()));
this.metricBuilder = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(
@@ -98,12 +99,8 @@ public CostBasedAutoScaler(
@Override
public void start()
{
- Callable scaleAction = () -> computeOptimalTaskCount(this.collectMetrics());
- Runnable onSuccessfulScale = () -> {
- };
-
autoscalerExecutor.scheduleAtFixedRate(
- supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter),
+ supervisor.buildDynamicAllocationTask(this::computeTaskCountForScaleAction, () -> {}, emitter),
config.getScaleActionPeriodMillis(),
config.getScaleActionPeriodMillis(),
TimeUnit.MILLISECONDS
@@ -129,46 +126,25 @@ public void reset()
// No-op.
}
- private CostMetrics collectMetrics()
+ @Override
+ public int computeTaskCountForRollover()
{
- if (spec.isSuspended()) {
- log.debug("Supervisor [%s] is suspended, skipping a metrics collection", supervisorId);
- return null;
- }
-
- final LagStats lagStats = supervisor.computeLagStats();
- if (lagStats == null) {
- log.debug("Lag stats unavailable for supervisorId [%s], skipping collection", supervisorId);
- return null;
- }
-
- final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
- final int partitionCount = supervisor.getPartitionCount();
-
- final Map> taskStats = supervisor.getStats();
- final double movingAvgRate = extractMovingAverage(taskStats, DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
- final double pollIdleRatio = extractPollIdleRatio(taskStats);
+ return computeOptimalTaskCount(lastKnownMetrics);
+ }
- final double avgPartitionLag = lagStats.getAvgLag();
+ public int computeTaskCountForScaleAction()
+ {
+ lastKnownMetrics = collectMetrics();
+ final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
+ final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
- // Use an actual 15-minute moving average processing rate if available
- final double avgProcessingRate;
- if (movingAvgRate > 0) {
- avgProcessingRate = movingAvgRate;
- } else {
- // Fallback: estimate processing rate based on idle ratio
- final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio);
- avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio;
- }
+ // Perform only scale-up actions
+ return optimalTaskCount >= currentTaskCount ? optimalTaskCount : -1;
+ }
- return new CostMetrics(
- avgPartitionLag,
- currentTaskCount,
- partitionCount,
- pollIdleRatio,
- supervisor.getIoConfig().getTaskDuration().getStandardSeconds(),
- avgProcessingRate
- );
+ public CostBasedAutoScalerConfig getConfig()
+ {
+ return config;
}
/**
@@ -184,7 +160,7 @@ private CostMetrics collectMetrics()
*
* @return optimal task count for scale-up, or -1 if no scaling action needed
*/
- public int computeOptimalTaskCount(CostMetrics metrics)
+ int computeOptimalTaskCount(CostMetrics metrics)
{
if (metrics == null) {
log.debug("No metrics available yet for supervisorId [%s]", supervisorId);
@@ -204,33 +180,28 @@ public int computeOptimalTaskCount(CostMetrics metrics)
return -1;
}
- // If idle is already in the ideal range [0.2, 0.6], optimal utilization has been achieved.
- // No scaling is needed - maintain stability by staying at the current task count.
- final double currentIdleRatio = metrics.getPollIdleRatio();
- if (currentIdleRatio >= 0 && WeightedCostFunction.isIdleInIdealRange(currentIdleRatio)) {
- log.debug(
- "Idle ratio [%.3f] is in ideal range for supervisorId [%s], no scaling needed",
- currentIdleRatio,
- supervisorId
- );
- return -1;
- }
-
int optimalTaskCount = -1;
- double optimalCost = Double.POSITIVE_INFINITY;
+ CostResult optimalCost = new CostResult();
for (int taskCount : validTaskCounts) {
- double cost = costFunction.computeCost(metrics, taskCount, config);
- log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost);
- if (cost < optimalCost) {
+ CostResult costResult = costFunction.computeCost(metrics, taskCount, config);
+ double cost = costResult.totalCost();
+ log.debug(
+ "Proposed task count: %d, Cost: %.4f (lag: %.4f, idle: %.4f)",
+ taskCount,
+ cost,
+ costResult.lagCost(),
+ costResult.idleCost()
+ );
+ if (cost < optimalCost.totalCost()) {
optimalTaskCount = taskCount;
- optimalCost = cost;
+ optimalCost = costResult;
}
}
- emitter.emit(metricBuilder.setMetric(AVG_LAG_METRIC, metrics.getAvgPartitionLag()));
- emitter.emit(metricBuilder.setMetric(AVG_IDLE_METRIC, metrics.getPollIdleRatio()));
emitter.emit(metricBuilder.setMetric(OPTIMAL_TASK_COUNT_METRIC, (long) optimalTaskCount));
+ emitter.emit(metricBuilder.setMetric(LAG_COST_METRIC, optimalCost.lagCost()));
+ emitter.emit(metricBuilder.setMetric(IDLE_COST_METRIC, optimalCost.idleCost()));
log.debug(
"Cost-based scaling evaluation for supervisorId [%s]: current=%d, optimal=%d, cost=%.4f, "
@@ -238,7 +209,7 @@ public int computeOptimalTaskCount(CostMetrics metrics)
supervisorId,
metrics.getCurrentTaskCount(),
optimalTaskCount,
- optimalCost,
+ optimalCost.totalCost(),
metrics.getAvgPartitionLag(),
metrics.getPollIdleRatio()
);
@@ -246,8 +217,8 @@ public int computeOptimalTaskCount(CostMetrics metrics)
if (optimalTaskCount == currentTaskCount) {
return -1;
}
- // Perform both scale-up and scale-down proactively
- // Future versions may perform scale-down on task rollover only
+
+ // Scale up is performed eagerly.
return optimalTaskCount;
}
@@ -264,26 +235,22 @@ static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
return new int[]{};
}
- List result = new ArrayList<>();
+ Set result = new HashSet<>();
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
// Minimum partitions per task correspond to the maximum number of tasks (scale up) and vice versa.
final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - MAX_INCREASE_IN_PARTITIONS_PER_TASK);
- final int maxPartitionsPerTask = Math.min(partitionCount, currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK);
+ final int maxPartitionsPerTask = Math.min(
+ partitionCount,
+ currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
+ );
for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= minPartitionsPerTask; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask;
- if (result.isEmpty() || result.get(result.size() - 1) != taskCount) {
- result.add(taskCount);
- }
+ result.add(taskCount);
}
return result.stream().mapToInt(Integer::intValue).toArray();
}
- public CostBasedAutoScalerConfig getConfig()
- {
- return config;
- }
-
/**
* Extracts the average poll-idle-ratio metric from task stats.
* This metric indicates how much time the consumer spends idle waiting for data.
@@ -324,9 +291,9 @@ static double extractPollIdleRatio(Map> taskStats)
*
* @param taskStats the stats map from supervisor.getStats()
* @return the average 15-minute processing rate across all tasks in records/second,
- * or -1 if no valid metrics are available
+ * or -1 if no valid metrics are available
*/
- static double extractMovingAverage(Map> taskStats, String movingAveragePeriodKey)
+ static double extractMovingAverage(Map> taskStats)
{
if (taskStats == null || taskStats.isEmpty()) {
return -1;
@@ -341,9 +308,15 @@ static double extractMovingAverage(Map> taskStats, S
if (movingAveragesObj instanceof Map) {
Object buildSegmentsObj = ((Map, ?>) movingAveragesObj).get(RowIngestionMeters.BUILD_SEGMENTS);
if (buildSegmentsObj instanceof Map) {
- Object fifteenMinObj = ((Map, ?>) buildSegmentsObj).get(movingAveragePeriodKey);
- if (fifteenMinObj instanceof Map) {
- Object processedRate = ((Map, ?>) fifteenMinObj).get(RowIngestionMeters.PROCESSED);
+ Object movingAvgObj = ((Map, ?>) buildSegmentsObj).get(DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME);
+ if (movingAvgObj == null) {
+ movingAvgObj = ((Map, ?>) buildSegmentsObj).get(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME);
+ if (movingAvgObj == null) {
+ movingAvgObj = ((Map, ?>) buildSegmentsObj).get(DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
+ }
+ }
+ if (movingAvgObj instanceof Map) {
+ Object processedRate = ((Map, ?>) movingAvgObj).get(RowIngestionMeters.PROCESSED);
if (processedRate instanceof Number) {
sum += ((Number) processedRate).doubleValue();
count++;
@@ -357,4 +330,47 @@ static double extractMovingAverage(Map> taskStats, S
return count > 0 ? sum / count : -1;
}
+
+ private CostMetrics collectMetrics()
+ {
+ if (spec.isSuspended()) {
+ log.debug("Supervisor [%s] is suspended, skipping a metrics collection", supervisorId);
+ return null;
+ }
+
+ final LagStats lagStats = supervisor.computeLagStats();
+ if (lagStats == null) {
+ log.debug("Lag stats unavailable for supervisorId [%s], skipping collection", supervisorId);
+ return null;
+ }
+
+ final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+ final int partitionCount = supervisor.getPartitionCount();
+
+ final Map> taskStats = supervisor.getStats();
+ final double movingAvgRate = extractMovingAverage(taskStats);
+ final double pollIdleRatio = extractPollIdleRatio(taskStats);
+
+ final double avgPartitionLag = lagStats.getAvgLag();
+
+ // Use an actual 15-minute moving average processing rate if available
+ final double avgProcessingRate;
+ if (movingAvgRate > 0) {
+ avgProcessingRate = movingAvgRate;
+ } else {
+ // Fallback: estimate processing rate based on the idle ratio
+ final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio);
+ avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio;
+ }
+
+ return new CostMetrics(
+ avgPartitionLag,
+ currentTaskCount,
+ partitionCount,
+ pollIdleRatio,
+ supervisor.getIoConfig().getTaskDuration().getStandardSeconds(),
+ avgProcessingRate
+ );
+ }
+
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
new file mode 100644
index 000000000000..42096dd61e17
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+/**
+ * Holds the result of a cost computation from {@link WeightedCostFunction#computeCost}.
+ * All costs are measured in seconds.
+ */
+public class CostResult
+{
+
+ private final double totalCost;
+ private final double lagCost;
+ private final double idleCost;
+
+ public CostResult()
+ {
+ this(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
+ }
+
+ /**
+ * @param totalCost the weighted sum of lagCost and idleCost
+ * @param lagCost the weighted cost representing expected time (seconds) to recover current lag
+ * @param idleCost the weighted cost representing total compute time (seconds) wasted being idle per task duration
+ */
+ public CostResult(double totalCost, double lagCost, double idleCost)
+ {
+ this.totalCost = totalCost;
+ this.lagCost = lagCost;
+ this.idleCost = idleCost;
+ }
+
+ public double totalCost()
+ {
+ return totalCost;
+ }
+
+ public double lagCost()
+ {
+ return lagCost;
+ }
+
+ public double idleCost()
+ {
+ return idleCost;
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 1af8233527a5..0da733ef9e71 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -58,14 +58,15 @@ public static boolean isIdleInIdealRange(double idleRatio)
*
*
* Formula: {@code lagWeight * lagRecoveryTime + idleWeight * idlenessCost}.
- * This approach directly connects costs to operational metricsю
+ * This approach directly connects costs to operational metrics.
*
- * @return cost score in seconds, or {@link Double#POSITIVE_INFINITY} for invalid inputs
+ * @return CostResult containing totalCost, lagCost, and idleCost,
+ * or result with {@link Double#POSITIVE_INFINITY} for invalid inputs
*/
- public double computeCost(CostMetrics metrics, int proposedTaskCount, CostBasedAutoScalerConfig config)
+ public CostResult computeCost(CostMetrics metrics, int proposedTaskCount, CostBasedAutoScalerConfig config)
{
if (metrics == null || config == null || proposedTaskCount <= 0 || metrics.getPartitionCount() <= 0) {
- return Double.POSITIVE_INFINITY;
+ return new CostResult(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
}
final double avgProcessingRate = metrics.getAvgProcessingRate();
@@ -74,9 +75,9 @@ public double computeCost(CostMetrics metrics, int proposedTaskCount, CostBasedA
// Metrics are unavailable - favor maintaining the current task count.
// We're conservative about scale up, but won't let an unlikey scale down to happen.
if (proposedTaskCount == metrics.getCurrentTaskCount()) {
- return 0.01d;
+ return new CostResult(0.01d, 0.0, 0.0);
} else {
- return Double.POSITIVE_INFINITY;
+ return new CostResult(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
}
} else {
// Lag recovery time is decreasing by adding tasks and increasing by ejecting tasks.
@@ -86,19 +87,21 @@ public double computeCost(CostMetrics metrics, int proposedTaskCount, CostBasedA
final double predictedIdleRatio = estimateIdleRatio(metrics, proposedTaskCount);
final double idleCost = proposedTaskCount * metrics.getTaskDurationSeconds() * predictedIdleRatio;
- final double cost = config.getLagWeight() * lagRecoveryTime + config.getIdleWeight() * idleCost;
+ final double lagCost = config.getLagWeight() * lagRecoveryTime;
+ final double weightedIdleCost = config.getIdleWeight() * idleCost;
+ final double cost = lagCost + weightedIdleCost;
log.debug(
- "Cost for taskCount[%d]: lagRecoveryTime[%.2fs], idleCost[%.2fs], "
+ "Cost for taskCount[%d]: lagCost[%.2fs], idleCost[%.2fs], "
+ "predictedIdle[%.3f], finalCost[%.2fs]",
proposedTaskCount,
- lagRecoveryTime,
- idleCost,
+ lagCost,
+ weightedIdleCost,
predictedIdleRatio,
cost
);
- return cost;
+ return new CostResult(cost, lagCost, weightedIdleCost);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index b61c93af3c8f..7794a798473b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -102,7 +102,9 @@ public void testCreateUpdateAndRemoveSupervisor()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
metadataSupervisorManager.insert("id1", spec);
supervisor3.start();
+ EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
@@ -115,6 +117,7 @@ public void testCreateUpdateAndRemoveSupervisor()
resetAll();
supervisor2.start();
+ EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.stop(true);
replayAll();
@@ -164,6 +167,7 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
metadataSupervisorManager.insert("id1", spec);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
@@ -225,6 +229,7 @@ public void testShouldUpdateSupervisor()
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
Assert.assertFalse(manager.shouldUpdateSupervisor(spec));
@@ -249,6 +254,7 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
exception.expect(DruidException.class);
replayAll();
manager.start();
@@ -332,6 +338,7 @@ public void testGetSupervisorStatus()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.expect(supervisor1.getStatus()).andReturn(report);
replayAll();
@@ -352,6 +359,7 @@ public void testHandoffTaskGroupsEarly()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.handoffTaskGroupsEarly(ImmutableList.of(1));
replayAll();
@@ -373,6 +381,7 @@ public void testHandoffTaskGroupsEarlyOnNonStreamSupervisor()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor3.start();
+ EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
@@ -414,6 +423,8 @@ public void testStartIndividualSupervisorsFailStart()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor3.start();
+ EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.start();
EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor explosion"));
replayAll();
@@ -433,7 +444,9 @@ public void testNoPersistOnFailedStart()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.stop(true);
+ EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor2.start();
EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor failed to start"));
replayAll();
@@ -461,6 +474,7 @@ public void testStopThrowsException()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.stopAsync();
EasyMock.expectLastCall().andThrow(new RuntimeException("RTE"));
replayAll();
@@ -479,6 +493,7 @@ public void testResetSupervisor()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.reset(EasyMock.anyObject(DataSourceMetadata.class));
replayAll();
@@ -498,6 +513,7 @@ public void testResetOnNonStreamSupervisor()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor3.start();
+ EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
@@ -533,6 +549,7 @@ public void testResetSupervisorWithSpecificOffsets()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.resetOffsets(datasourceMetadata);
replayAll();
@@ -558,7 +575,9 @@ public void testCreateSuspendResumeAndStopSupervisor()
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
metadataSupervisorManager.insert("id1", spec);
supervisor3.start();
+ EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.start();
@@ -574,6 +593,7 @@ public void testCreateSuspendResumeAndStopSupervisor()
resetAll();
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor2.start();
+ EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
supervisor1.stop(true);
replayAll();
@@ -589,6 +609,7 @@ public void testCreateSuspendResumeAndStopSupervisor()
metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert));
supervisor2.stop(true);
supervisor1.start();
+ EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
manager.suspendOrResumeSupervisor("id1", false);
@@ -631,29 +652,29 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec suspendedSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor suspendedSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor suspendedSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes();
EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes();
EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes();
EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes();
EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes();
- EasyMock.replay(suspendedSpec);
+ EasyMock.replay(suspendedSpec, suspendedSupervisor);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec activeSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor activeSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor activeSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes();
EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes();
EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes();
EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes();
- EasyMock.replay(activeSpec);
+ EasyMock.replay(activeSpec, activeSupervisor);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor activeSupervisorWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor activeSupervisorWithConcurrentLocks = EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources())
@@ -664,11 +685,11 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
.andReturn(null).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.getContext())
.andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true)).anyTimes();
- EasyMock.replay(activeSpecWithConcurrentLocks);
+ EasyMock.replay(activeSpecWithConcurrentLocks, activeSupervisorWithConcurrentLocks);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor activeAppendSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes();
@@ -678,12 +699,12 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name()
)).anyTimes();
- EasyMock.replay(activeAppendSpec);
+ EasyMock.replay(activeAppendSpec, activeAppendSupervisor);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
// A supervisor with useConcurrentLocks set to false explicitly must not use an append lock
SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisor.class);
+ Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.createNiceMock(SeekableStreamSupervisor.class);
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources())
@@ -697,7 +718,7 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name()
)).anyTimes();
- EasyMock.replay(specWithUseConcurrentLocksFalse);
+ EasyMock.replay(specWithUseConcurrentLocksFalse, supervisorWithUseConcurrentLocksFalse);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
replayAll();
@@ -737,7 +758,7 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor()
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
SeekableStreamSupervisorSpec streamingSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
- SeekableStreamSupervisor streamSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
+ SeekableStreamSupervisor streamSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class);
streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject());
EasyMock.expectLastCall().once();
EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes();
@@ -746,7 +767,7 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor()
EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes();
EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes();
- EasyMock.replay(streamingSpec);
+ EasyMock.replay(streamingSpec, streamSupervisor);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
EasyMock.expectLastCall().once();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 58ffb8438e0d..f0b15f8c8a09 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -37,6 +37,7 @@
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -62,6 +63,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.TestSupervisorSpec;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -678,6 +680,103 @@ public void testAutoScalerCreated()
Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
}
+ @Test
+ public void testAutoScalerReturnsNoopWhenSupervisorIsNotSeekableStreamSupervisor()
+ {
+ // Test the branch where supervisor instanceof SeekableStreamSupervisor is false
+ HashMap autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("taskCountMax", 8);
+ autoScalerConfig.put("taskCountMin", 1);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+ .andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class))
+ .anyTimes();
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+ EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+ EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.replay(supervisor4);
+
+ TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
+
+ // Create a non-SeekableStreamSupervisor mock
+ Supervisor nonSeekableStreamSupervisor = EasyMock.mock(Supervisor.class);
+ EasyMock.replay(nonSeekableStreamSupervisor);
+
+ // When passing a non-SeekableStreamSupervisor, should return NoopTaskAutoScaler
+ SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(nonSeekableStreamSupervisor);
+ Assert.assertTrue(
+ "Should return NoopTaskAutoScaler when supervisor is not SeekableStreamSupervisor",
+ autoscaler instanceof NoopTaskAutoScaler
+ );
+ }
+
+ @Test
+ public void testAutoScalerReturnsNoopWhenConfigIsNull()
+ {
+ // Test the branch where autoScalerConfig is null
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
+ .andReturn(null)
+ .anyTimes();
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+ EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+ EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.replay(supervisor4);
+
+ TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1"
+ );
+
+ // When autoScalerConfig is null, should return NoopTaskAutoScaler
+ SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
+ Assert.assertTrue(
+ "Should return NoopTaskAutoScaler when autoScalerConfig is null",
+ autoscaler instanceof NoopTaskAutoScaler
+ );
+ }
+
@Test
public void testDefaultAutoScalerConfigCreatedWithDefault()
{
@@ -857,71 +956,6 @@ public int getActiveTaskGroupsCount()
autoScaler.stop();
}
- @Test
- public void test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing() throws InterruptedException
- {
- EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
- EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
- EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
- EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
- EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
- EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
- EasyMock.replay(spec);
-
- EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
- EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
- EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
- EasyMock.replay(ingestionSchema);
-
- EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
- EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
- EasyMock.replay(taskMaster);
-
- StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
- TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
-
- LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
- supervisor,
- DATASOURCE,
- mapper.convertValue(
- getScaleOutProperties(2),
- LagBasedAutoScalerConfig.class
- ),
- spec,
- dynamicActionEmitter
- );
-
- supervisor.addTaskGroupToPendingCompletionTaskGroup(
- 0,
- ImmutableMap.of("0", "0"),
- null,
- null,
- Set.of("dummyTask"),
- Collections.emptySet()
- );
-
- supervisor.start();
- autoScaler.start();
-
- supervisor.runInternal();
- Thread.sleep(1000); // ensure a dynamic allocation notice completes
-
- Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
- Assert.assertTrue(
- dynamicActionEmitter
- .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
- .stream()
- .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
- .filter(Objects::nonNull)
- .anyMatch("There are tasks pending completion"::equals)
- );
-
- emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
- autoScaler.reset();
- autoScaler.stop();
- }
-
@Test
public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor() throws InterruptedException
{
@@ -1593,6 +1627,175 @@ public String getSource()
originalSpec.validateSpecUpdateTo(proposedSpecSameSource);
}
+ @Test
+ public void test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing() throws InterruptedException
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ dynamicActionEmitter
+ );
+
+ supervisor.addTaskGroupToPendingCompletionTaskGroup(
+ 0,
+ ImmutableMap.of("0", "0"),
+ null,
+ null,
+ Set.of("dummyTask"),
+ Collections.emptySet()
+ );
+
+ supervisor.start();
+ autoScaler.start();
+
+ supervisor.runInternal();
+ Thread.sleep(1000); // ensure a dynamic allocation notice completes
+
+ Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue());
+ Assert.assertTrue(
+ dynamicActionEmitter
+ .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
+ .stream()
+ .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
+ .filter(Objects::nonNull)
+ .anyMatch("There are tasks pending completion"::equals)
+ );
+
+ emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC);
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
+ @Test
+ public void test_dynamicAllocationNotice_skips_whenSupervisorSuspended() throws InterruptedException
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ // Suspended → DynamicAllocationTasksNotice should return early and not scale
+ EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes();
+ EasyMock.replay(spec);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ emitter
+ );
+
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+
+ int before = supervisor.getIoConfig().getTaskCount();
+ Thread.sleep(1000);
+ int after = supervisor.getIoConfig().getTaskCount();
+ // No scaling expected because supervisor is suspended
+ Assert.assertEquals(before, after);
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
+ @Test
+ public void test_changeTaskCountInIOConfig_handlesExceptionAndStillUpdatesTaskCount() throws InterruptedException
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ // SupervisorManager present but metadata insert fails → should be handled
+ SupervisorManager sm = EasyMock.createMock(SupervisorManager.class);
+ MetadataSupervisorManager msm = EasyMock.createMock(MetadataSupervisorManager.class);
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(sm)).anyTimes();
+ EasyMock.expect(sm.getMetadataSupervisorManager()).andReturn(msm).anyTimes();
+ msm.insert(EasyMock.anyString(), EasyMock.anyObject());
+ EasyMock.expectLastCall().andThrow(new RuntimeException("boom")).anyTimes();
+ EasyMock.replay(taskMaster, sm, msm);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ emitter
+ );
+
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+
+ int before = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, before);
+ Thread.sleep(1000); // allow one dynamic allocation cycle
+ int after = supervisor.getIoConfig().getTaskCount();
+ // Even though metadata insert failed, taskCount should still be updated in ioConfig
+ Assert.assertEquals(2, after);
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
@Test
public void testMergeSpecConfigs()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
new file mode 100644
index 000000000000..3e4bcd92b99f
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class SeekableStreamSupervisorScaleDuringTaskRolloverTest extends EasyMockSupport
+{
+ private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
+ private static final String STREAM = "stream";
+ private static final String DATASOURCE = "testDS";
+ private static final String SUPERVISOR = "supervisor";
+ private static final int DEFAULT_TASK_COUNT = 10;
+
+ private TaskStorage taskStorage;
+ private TaskMaster taskMaster;
+ private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
+ private ServiceEmitter emitter;
+ private RowIngestionMetersFactory rowIngestionMetersFactory;
+ private SeekableStreamIndexTaskClientFactory taskClientFactory;
+ private SeekableStreamSupervisorSpec spec;
+ private SupervisorStateManagerConfig supervisorConfig;
+
+ @Before
+ public void setUp()
+ {
+ taskStorage = EasyMock.mock(TaskStorage.class);
+ taskMaster = EasyMock.mock(TaskMaster.class);
+ indexerMetadataStorageCoordinator = EasyMock.mock(IndexerMetadataStorageCoordinator.class);
+ emitter = new StubServiceEmitter();
+ rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class);
+ taskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
+ spec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ supervisorConfig = new SupervisorStateManagerConfig();
+
+ // Common taskMaster setup - used by all tests
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+ }
+
+ @Test
+ public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale()
+ {
+ // Given
+ setupSpecExpectations(getIOConfigWithoutAutoScaler(5));
+ EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+ EasyMock.replay(spec);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
+ supervisor.start();
+
+ int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // When
+ supervisor.maybeScaleDuringTaskRollover();
+
+ // Then
+ Assert.assertEquals(
+ "Task count should not change when taskAutoScaler is null",
+ beforeTaskCount,
+ (int) supervisor.getIoConfig().getTaskCount()
+ );
+ }
+
+ @Test
+ public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale()
+ {
+ // Given
+ setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+ EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+ .andReturn(createMockAutoScaler(-1))
+ .anyTimes();
+ EasyMock.replay(spec);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(100);
+ supervisor.start();
+ supervisor.createAutoscaler(spec);
+
+ int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // When
+ supervisor.maybeScaleDuringTaskRollover();
+
+ // Then
+ Assert.assertEquals(
+ "Task count should not change when rolloverTaskCount <= 0",
+ beforeTaskCount,
+ (int) supervisor.getIoConfig().getTaskCount()
+ );
+ }
+
+ @Test
+ public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling()
+ {
+ // Given
+ final int targetTaskCount = 5;
+
+ setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+ EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+ .andReturn(createMockAutoScaler(targetTaskCount))
+ .anyTimes();
+ EasyMock.replay(spec);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(100);
+ supervisor.start();
+ supervisor.createAutoscaler(spec);
+
+ Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+
+ // When
+ supervisor.maybeScaleDuringTaskRollover();
+
+ // Then
+ Assert.assertEquals(
+ "Task count should be updated to " + targetTaskCount + " when rolloverTaskCount > 0",
+ targetTaskCount,
+ (int) supervisor.getIoConfig().getTaskCount()
+ );
+ }
+
+ @Test
+ public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale()
+ {
+ // Given
+ setupSpecExpectations(getIOConfigWithCostBasedAutoScaler());
+ EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
+ .andReturn(createMockAutoScaler(0))
+ .anyTimes();
+ EasyMock.replay(spec);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(100);
+ supervisor.start();
+ supervisor.createAutoscaler(spec);
+
+ int beforeTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // When
+ supervisor.maybeScaleDuringTaskRollover();
+
+ // Then
+ Assert.assertEquals(
+ "Task count should not change when rolloverTaskCount is 0",
+ beforeTaskCount,
+ (int) supervisor.getIoConfig().getTaskCount()
+ );
+ }
+
+ // Helper methods for test setup
+
+ /**
+ * Sets up common spec expectations. Call EasyMock.replay(spec) after this and any additional expectations.
+ */
+ private void setupSpecExpectations(SeekableStreamSupervisorIOConfig ioConfig)
+ {
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes();
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ }
+
+ /**
+ * Creates a mock autoscaler that returns the specified rollover count.
+ */
+ private static SupervisorTaskAutoScaler createMockAutoScaler(int rolloverCount)
+ {
+ return new SupervisorTaskAutoScaler()
+ {
+ @Override
+ public void start()
+ {
+ }
+
+ @Override
+ public void stop()
+ {
+ }
+
+ @Override
+ public void reset()
+ {
+ }
+
+ @Override
+ public int computeTaskCountForRollover()
+ {
+ return rolloverCount;
+ }
+ };
+ }
+
+ // Helper methods for config creation
+
+ private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig()
+ {
+ return CostBasedAutoScalerConfig.builder()
+ .taskCountMax(100)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(0.3)
+ .idleWeight(0.7)
+ .scaleActionPeriodMillis(100)
+ .build();
+ }
+
+ private SeekableStreamSupervisorIOConfig getIOConfigWithCostBasedAutoScaler()
+ {
+ return createIOConfig(DEFAULT_TASK_COUNT, getCostBasedAutoScalerConfig());
+ }
+
+ private SeekableStreamSupervisorIOConfig getIOConfigWithoutAutoScaler(int taskCount)
+ {
+ return createIOConfig(taskCount, null);
+ }
+
+ private SeekableStreamSupervisorIOConfig createIOConfig(int taskCount, CostBasedAutoScalerConfig autoScalerConfig)
+ {
+ return new SeekableStreamSupervisorIOConfig(
+ STREAM,
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
+ 1,
+ taskCount,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ autoScalerConfig,
+ LagAggregator.DEFAULT,
+ null,
+ null,
+ null
+ )
+ {
+ };
+ }
+
+ private static DataSchema getDataSchema()
+ {
+ List dimensions = new ArrayList<>();
+ dimensions.add(StringDimensionSchema.create("dim1"));
+ dimensions.add(StringDimensionSchema.create("dim2"));
+
+ return DataSchema.builder()
+ .withDataSource(DATASOURCE)
+ .withTimestamp(new TimestampSpec("timestamp", "iso", null))
+ .withDimensions(dimensions)
+ .withAggregators(new CountAggregatorFactory("rows"))
+ .withGranularity(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ ImmutableList.of()
+ )
+ )
+ .build();
+ }
+
+ private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+ {
+ return new SeekableStreamSupervisorTuningConfig()
+ {
+ @Override
+ public Integer getWorkerThreads()
+ {
+ return 1;
+ }
+
+ @Override
+ public Long getChatRetries()
+ {
+ return 1L;
+ }
+
+ @Override
+ public Duration getHttpTimeout()
+ {
+ return new Period("PT1M").toStandardDuration();
+ }
+
+ @Override
+ public Duration getShutdownTimeout()
+ {
+ return new Period("PT1S").toStandardDuration();
+ }
+
+ @Override
+ public Duration getRepartitionTransitionDuration()
+ {
+ return new Period("PT2M").toStandardDuration();
+ }
+
+ @Override
+ public Duration getOffsetFetchPeriod()
+ {
+ return new Period("PT5M").toStandardDuration();
+ }
+
+ @Override
+ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+ {
+ return new SeekableStreamIndexTaskTuningConfig(null, null, null, null, null, null, null, null, null, null,
+ null, null, null, null, null, null, null, null, null, null,
+ null, null, null
+ )
+ {
+ @Override
+ public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
+ {
+ return null;
+ }
+
+ @Override
+ public String toString()
+ {
+ return null;
+ }
+ };
+ }
+ };
+ }
+
+ // Inner test classes
+
+ private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor
+ {
+ private BaseTestSeekableStreamSupervisor()
+ {
+ super(
+ "testSupervisorId",
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ spec,
+ rowIngestionMetersFactory,
+ false
+ );
+ }
+
+ @Override
+ protected String baseTaskName()
+ {
+ return "test";
+ }
+
+ @Override
+ protected void updatePartitionLagFromStream()
+ {
+ }
+
+ @Nullable
+ @Override
+ protected Map getPartitionRecordLag()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected Map getPartitionTimeLag()
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
+ int groupId,
+ Map startPartitions,
+ Map endPartitions,
+ String baseSequenceName,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ Set exclusiveStartSequenceNumberPartitions,
+ SeekableStreamSupervisorIOConfig ioConfig
+ )
+ {
+ return new SeekableStreamIndexTaskIOConfig<>(
+ groupId,
+ baseSequenceName,
+ new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
+ new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+ true,
+ minimumMessageTime,
+ maximumMessageTime,
+ ioConfig.getInputFormat(),
+ ioConfig.getTaskDuration().getStandardMinutes()
+ )
+ {
+ };
+ }
+
+ @Override
+ protected List> createIndexTasks(
+ int replicas,
+ String baseSequenceName,
+ ObjectMapper sortingMapper,
+ TreeMap> sequenceOffsets,
+ SeekableStreamIndexTaskIOConfig taskIoConfig,
+ SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+ RowIngestionMetersFactory rowIngestionMetersFactory
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected int getTaskGroupIdForPartition(String partition)
+ {
+ return 0;
+ }
+
+ @Override
+ protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean doesTaskMatchSupervisor(Task task)
+ {
+ return true;
+ }
+
+ @Override
+ protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
+ String stream,
+ Map map
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean isExclusive)
+ {
+ return new OrderedSequenceNumber<>(seq, isExclusive)
+ {
+ @Override
+ public int compareTo(OrderedSequenceNumber o)
+ {
+ return new BigInteger(this.get()).compareTo(new BigInteger(o.get()));
+ }
+ };
+ }
+
+ @Override
+ protected Map getRecordLagPerPartition(Map currentOffsets)
+ {
+ return null;
+ }
+
+ @Override
+ protected Map getTimeLagPerPartition(Map currentOffsets)
+ {
+ return null;
+ }
+
+ @Override
+ protected RecordSupplier setupRecordSupplier()
+ {
+ return recordSupplier;
+ }
+
+ @Override
+ protected SeekableStreamSupervisorReportPayload createReportPayload(
+ int numPartitions,
+ boolean includeOffsets
+ )
+ {
+ return new SeekableStreamSupervisorReportPayload<>(SUPERVISOR, DATASOURCE, STREAM, 1, 1, 1L,
+ null, null, null, null, null, null,
+ false, true, null, null, null
+ )
+ {
+ };
+ }
+
+ @Override
+ protected String getNotSetMarker()
+ {
+ return "NOT_SET";
+ }
+
+ @Override
+ protected String getEndOfPartitionMarker()
+ {
+ return "EOF";
+ }
+
+ @Override
+ protected boolean isEndOfShard(String seqNum)
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean isShardExpirationMarker(String seqNum)
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
+ {
+ return false;
+ }
+ }
+
+ private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
+ {
+ private final int partitionNumbers;
+
+ public TestSeekableStreamSupervisor(int partitionNumbers)
+ {
+ this.partitionNumbers = partitionNumbers;
+ }
+
+ @Override
+ protected void scheduleReporting(ScheduledExecutorService reportingExec)
+ {
+ }
+
+ @Override
+ public LagStats computeLagStats()
+ {
+ return new LagStats(0, 0, 0);
+ }
+
+ @Override
+ public int getPartitionCount()
+ {
+ return partitionNumbers;
+ }
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 5df9edd184d2..71e798e7e2fa 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2751,6 +2751,28 @@ public void testMaxAllowedStopsWithStopTaskCountRatio()
Assert.assertEquals(12, config.getMaxAllowedStops());
}
+ @Test
+ public void testCreateAutoscalerStoresAndReturnsAutoscaler()
+ {
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+ EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
+
+ replayAll();
+
+ SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+ // Test that createAutoscaler returns null when spec returns null
+ SeekableStreamSupervisorSpec mockSpec = EasyMock.createMock(SeekableStreamSupervisorSpec.class);
+ EasyMock.expect(mockSpec.createAutoscaler(supervisor)).andReturn(null).once();
+ EasyMock.replay(mockSpec);
+
+ Assert.assertNull(supervisor.createAutoscaler(mockSpec));
+ EasyMock.verify(mockSpec);
+
+ verifyAll();
+ }
+
private static DataSchema getDataSchema()
{
List dimensions = new ArrayList<>();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 44bbef9c6a15..caf5453f5217 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
-import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
@@ -35,6 +34,9 @@
import java.util.HashMap;
import java.util.Map;
+import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME;
+import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME;
+import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME;
import static org.mockito.Mockito.when;
public class CostBasedAutoScalerTest
@@ -69,12 +71,23 @@ public void testComputeValidTaskCounts()
{
// For 100 partitions at 25 tasks (4 partitions/task), valid counts include 25 and 34
int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100, 25);
+ Assert.assertTrue("Should contain the current task count", contains(validTaskCounts, 25));
+ Assert.assertTrue("Should contain the next scale-up option", contains(validTaskCounts, 34));
- Assert.assertTrue("Should contain current task count", contains(validTaskCounts, 25));
- Assert.assertTrue("Should contain next scale-up option", contains(validTaskCounts, 34));
+ // Edge cases
+ Assert.assertEquals("Zero partitions return empty array", 0, CostBasedAutoScaler.computeValidTaskCounts(0, 10).length);
+ Assert.assertEquals("Negative partitions return empty array", 0, CostBasedAutoScaler.computeValidTaskCounts(-5, 10).length);
- // Edge case: zero partitions returns empty array
- Assert.assertEquals(0, CostBasedAutoScaler.computeValidTaskCounts(0, 10).length);
+ // Single partition
+ int[] singlePartition = CostBasedAutoScaler.computeValidTaskCounts(1, 1);
+ Assert.assertTrue("Single partition should have at least one valid count", singlePartition.length > 0);
+ Assert.assertTrue("Single partition should contain 1", contains(singlePartition, 1));
+
+ // Current exceeds partitions - should still yield valid, deduplicated options
+ int[] exceedsPartitions = CostBasedAutoScaler.computeValidTaskCounts(2, 5);
+ Assert.assertEquals(2, exceedsPartitions.length);
+ Assert.assertTrue(contains(exceedsPartitions, 1));
+ Assert.assertTrue(contains(exceedsPartitions, 2));
}
@Test
@@ -82,42 +95,24 @@ public void testComputeOptimalTaskCountInvalidInputs()
{
Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(null));
Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(0.0, 10, 0, 0.0)));
- }
-
- @Test
- public void testComputeOptimalTaskCountIdleInIdealRange()
- {
- // When idle is in ideal range [0.2, 0.6], no scaling should occur
- Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(5000.0, 25, 100, 0.4)));
+ Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3)));
+ Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
}
@Test
public void testComputeOptimalTaskCountScaling()
{
// High idle (underutilized) - should scale down
- // With high idle (0.8), the algorithm evaluates lower task counts and finds they have lower idle cost
int scaleDownResult = autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
Assert.assertTrue("Should scale down when idle > 0.6", scaleDownResult < 25);
- }
- @Test
- public void testComputeOptimalTaskCountLowIdleDoesNotScaleUpWithBalancedWeights()
- {
- // With corrected idle ratio model and marginal lag model, low idle does not
- // automatically trigger scale-up. The algorithm is conservative because:
- // 1. Scale-up increases idle cost (more tasks = more idle per task with fixed load)
- // 2. Marginal lag model means only ADDITIONAL tasks work on backlog
- //
- // This is intentional: the idle-heavy weights (0.4 idle) make the algorithm
- // favor stability over aggressive scaling
- int result = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
-
- // Algorithm evaluates costs and may find current count optimal
- // or may scale down if idle cost reduction outweighs lag increase
- Assert.assertTrue(
- "With low idle and balanced weights, algorithm should not scale up aggressively",
- result == -1 || result <= 25
- );
+ // Very high idle with high task count - should scale down
+ int highIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
+ Assert.assertTrue("Scale down scenario should return optimal <= current", highIdleResult <= 50);
+
+ // With low idle and balanced weights, algorithm should not scale up aggressively
+ int lowIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
+ Assert.assertTrue("With low idle and balanced weights, should not scale up aggressively", lowIdleResult <= 25);
}
@Test
@@ -142,35 +137,219 @@ public void testExtractPollIdleRatio()
}
@Test
- public void testExtractProcessingRateMovingAverage()
+ public void testExtractPollIdleRatioInvalidTypes()
+ {
+ // Non-map task metric
+ Map> nonMapTask = new HashMap<>();
+ nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
+ Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(nonMapTask), 0.0001);
+
+ // Empty autoscaler metrics
+ Map> emptyAutoscaler = new HashMap<>();
+ Map taskStats1 = new HashMap<>();
+ taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new HashMap<>());
+ emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
+ Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler), 0.0001);
+
+ // Non-map autoscaler metrics
+ Map> nonMapAutoscaler = new HashMap<>();
+ Map taskStats2 = new HashMap<>();
+ taskStats2.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, "not-a-map");
+ nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
+ Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler), 0.0001);
+
+ // Non-number poll idle ratio
+ Map> nonNumberRatio = new HashMap<>();
+ Map taskStats3 = new HashMap<>();
+ Map autoscalerMetrics = new HashMap<>();
+ autoscalerMetrics.put(SeekableStreamIndexTaskRunner.POLL_IDLE_RATIO_KEY, "not-a-number");
+ taskStats3.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, autoscalerMetrics);
+ nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3));
+ Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio), 0.0001);
+ }
+
+ @Test
+ public void testExtractMovingAverage()
{
// Null and empty return -1
- Assert.assertEquals(
- -1.,
- CostBasedAutoScaler.extractMovingAverage(null, DropwizardRowIngestionMeters.FIVE_MINUTE_NAME),
- 0.0001
- );
- Assert.assertEquals(
- -1.,
- CostBasedAutoScaler.extractMovingAverage(
- Collections.emptyMap(),
- DropwizardRowIngestionMeters.FIVE_MINUTE_NAME
- ),
- 0.0001
- );
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(null), 0.0001);
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(Collections.emptyMap()), 0.0001);
// Missing metrics return -1
Map> missingMetrics = new HashMap<>();
missingMetrics.put("0", Collections.singletonMap("task-0", new HashMap<>()));
- Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(missingMetrics, DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), 0.0001);
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(missingMetrics), 0.0001);
- // Valid stats return average
+ // Valid stats return average (using 5-minute)
Map> validStats = new HashMap<>();
Map group = new HashMap<>();
group.put("task-0", buildTaskStatsWithMovingAverage(1000.0));
group.put("task-1", buildTaskStatsWithMovingAverage(2000.0));
validStats.put("0", group);
- Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(validStats, DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), 0.0001);
+ Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(validStats), 0.0001);
+ }
+
+ @Test
+ public void testExtractMovingAverageIntervalFallback()
+ {
+ // 15-minute average is preferred
+ Map> fifteenMin = new HashMap<>();
+ fifteenMin.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0)));
+ Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001);
+
+ // 1-minute as final fallback
+ Map> oneMin = new HashMap<>();
+ oneMin.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0)));
+ Assert.assertEquals(500.0, CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001);
+
+ // 15-minute preferred over 5-minute when both available
+ Map> allIntervals = new HashMap<>();
+ allIntervals.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMultipleMovingAverages(1500.0, 1000.0, 500.0)));
+ Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(allIntervals), 0.0001);
+
+ // Falls back to 5-minute when 15-minute is null
+ Map> nullFifteen = new HashMap<>();
+ nullFifteen.put("0", Collections.singletonMap("task-0", buildTaskStatsWithNullInterval(FIFTEEN_MINUTE_NAME, FIVE_MINUTE_NAME, 750.0)));
+ Assert.assertEquals(750.0, CostBasedAutoScaler.extractMovingAverage(nullFifteen), 0.0001);
+
+ // Falls back to 1-minute when both 15 and 5 are null
+ Map> bothNull = new HashMap<>();
+ bothNull.put("0", Collections.singletonMap("task-0", buildTaskStatsWithTwoNullIntervals(250.0)));
+ Assert.assertEquals(250.0, CostBasedAutoScaler.extractMovingAverage(bothNull), 0.0001);
+ }
+
+ @Test
+ public void testExtractMovingAverageInvalidTypes()
+ {
+ // Non-map task metric
+ Map> nonMapTask = new HashMap<>();
+ nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonMapTask), 0.0001);
+
+ // Missing buildSegments
+ Map> missingBuild = new HashMap<>();
+ Map taskStats1 = new HashMap<>();
+ taskStats1.put("movingAverages", new HashMap<>());
+ missingBuild.put("0", Collections.singletonMap("task-0", taskStats1));
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(missingBuild), 0.0001);
+
+ // Non-map movingAverages
+ Map> nonMapMA = new HashMap<>();
+ Map taskStats2 = new HashMap<>();
+ taskStats2.put("movingAverages", "not-a-map");
+ nonMapMA.put("0", Collections.singletonMap("task-0", taskStats2));
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonMapMA), 0.0001);
+
+ // Non-map buildSegments
+ Map> nonMapBS = new HashMap<>();
+ Map taskStats3 = new HashMap<>();
+ Map movingAverages3 = new HashMap<>();
+ movingAverages3.put(RowIngestionMeters.BUILD_SEGMENTS, "not-a-map");
+ taskStats3.put("movingAverages", movingAverages3);
+ nonMapBS.put("0", Collections.singletonMap("task-0", taskStats3));
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonMapBS), 0.0001);
+
+ // Non-map interval data
+ Map> nonMapInterval = new HashMap<>();
+ Map taskStats4 = new HashMap<>();
+ Map movingAverages4 = new HashMap<>();
+ Map buildSegments4 = new HashMap<>();
+ buildSegments4.put(FIFTEEN_MINUTE_NAME, "not-a-map");
+ movingAverages4.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments4);
+ taskStats4.put("movingAverages", movingAverages4);
+ nonMapInterval.put("0", Collections.singletonMap("task-0", taskStats4));
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonMapInterval), 0.0001);
+
+ // Non-number processed rate
+ Map> nonNumberRate = new HashMap<>();
+ Map taskStats5 = new HashMap<>();
+ Map movingAverages5 = new HashMap<>();
+ Map buildSegments5 = new HashMap<>();
+ Map fifteenMin = new HashMap<>();
+ fifteenMin.put(RowIngestionMeters.PROCESSED, "not-a-number");
+ buildSegments5.put(FIFTEEN_MINUTE_NAME, fifteenMin);
+ movingAverages5.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments5);
+ taskStats5.put("movingAverages", movingAverages5);
+ nonNumberRate.put("0", Collections.singletonMap("task-0", taskStats5));
+ Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonNumberRate), 0.0001);
+ }
+
+ @Test
+ public void testComputeTaskCountForRolloverReturnsMinusOneWhenSuspended()
+ {
+ SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+ SeekableStreamSupervisor supervisor = Mockito.mock(SeekableStreamSupervisor.class);
+ ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+ SeekableStreamSupervisorIOConfig ioConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(spec.getId()).thenReturn("s-up");
+ when(spec.isSuspended()).thenReturn(true);
+ when(supervisor.getIoConfig()).thenReturn(ioConfig);
+ when(ioConfig.getStream()).thenReturn("stream");
+
+ CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(10)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(0.5)
+ .idleWeight(0.5)
+ .build();
+
+ CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, spec, emitter);
+ Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
+ }
+
+ @Test
+ public void testComputeTaskCountForRolloverReturnsMinusOneWhenLagStatsNull()
+ {
+ SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+ SeekableStreamSupervisor supervisor = Mockito.mock(SeekableStreamSupervisor.class);
+ ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+ SeekableStreamSupervisorIOConfig ioConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(spec.getId()).thenReturn("s-up");
+ when(spec.isSuspended()).thenReturn(false);
+ when(supervisor.computeLagStats()).thenReturn(null);
+ when(supervisor.getIoConfig()).thenReturn(ioConfig);
+ when(ioConfig.getStream()).thenReturn("stream");
+
+ CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(10)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(0.5)
+ .idleWeight(0.5)
+ .build();
+
+ CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, spec, emitter);
+ Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
+ }
+
+ @Test
+ public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics()
+ {
+ // Tests the case where lastKnownMetrics is null (no computeTaskCountForScaleAction called)
+ SupervisorSpec spec = Mockito.mock(SupervisorSpec.class);
+ SeekableStreamSupervisor supervisor = Mockito.mock(SeekableStreamSupervisor.class);
+ ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class);
+ SeekableStreamSupervisorIOConfig ioConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(spec.getId()).thenReturn("s-up");
+ when(spec.isSuspended()).thenReturn(false);
+ when(supervisor.getIoConfig()).thenReturn(ioConfig);
+ when(ioConfig.getStream()).thenReturn("stream");
+
+ CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder()
+ .taskCountMax(10)
+ .taskCountMin(1)
+ .enableTaskAutoScaler(true)
+ .lagWeight(0.5)
+ .idleWeight(0.5)
+ .build();
+
+ CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, spec, emitter);
+ // Should return -1 when lastKnownMetrics is null
+ Assert.assertEquals(-1, scaler.computeTaskCountForRollover());
}
private CostMetrics createMetrics(
@@ -213,7 +392,68 @@ private Map buildTaskStatsWithPollIdle(double pollIdleRatio)
private Map buildTaskStatsWithMovingAverage(double processedRate)
{
Map buildSegments = new HashMap<>();
- buildSegments.put(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, processedRate));
+ buildSegments.put(FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, processedRate));
+
+ Map movingAverages = new HashMap<>();
+ movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+ Map taskStats = new HashMap<>();
+ taskStats.put("movingAverages", movingAverages);
+ return taskStats;
+ }
+
+ private Map buildTaskStatsWithMovingAverageForInterval(String intervalName, double processedRate)
+ {
+ Map buildSegments = new HashMap<>();
+ buildSegments.put(intervalName, Map.of(RowIngestionMeters.PROCESSED, processedRate));
+
+ Map movingAverages = new HashMap<>();
+ movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+ Map taskStats = new HashMap<>();
+ taskStats.put("movingAverages", movingAverages);
+ return taskStats;
+ }
+
+ private Map buildTaskStatsWithMultipleMovingAverages(
+ double fifteenMinRate,
+ double fiveMinRate,
+ double oneMinRate
+ )
+ {
+ Map buildSegments = new HashMap<>();
+ buildSegments.put(FIFTEEN_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, fifteenMinRate));
+ buildSegments.put(FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, fiveMinRate));
+ buildSegments.put(ONE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, oneMinRate));
+
+ Map movingAverages = new HashMap<>();
+ movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+ Map taskStats = new HashMap<>();
+ taskStats.put("movingAverages", movingAverages);
+ return taskStats;
+ }
+
+ private Map buildTaskStatsWithNullInterval(String nullInterval, String validInterval, double processedRate)
+ {
+ Map buildSegments = new HashMap<>();
+ buildSegments.put(nullInterval, null);
+ buildSegments.put(validInterval, Map.of(RowIngestionMeters.PROCESSED, processedRate));
+
+ Map movingAverages = new HashMap<>();
+ movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
+
+ Map taskStats = new HashMap<>();
+ taskStats.put("movingAverages", movingAverages);
+ return taskStats;
+ }
+
+ private Map buildTaskStatsWithTwoNullIntervals(double oneMinRate)
+ {
+ Map buildSegments = new HashMap<>();
+ buildSegments.put(FIFTEEN_MINUTE_NAME, null);
+ buildSegments.put(FIVE_MINUTE_NAME, null);
+ buildSegments.put(ONE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, oneMinRate));
Map movingAverages = new HashMap<>();
movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index 0c5602052d4d..90b50477c924 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -47,11 +47,11 @@ public void testComputeCostInvalidInputs()
{
CostMetrics validMetrics = createMetrics(100000.0, 10, 100, 0.3);
- Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(null, 10, config), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 10, null), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 0, config), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, -5, config), 0.0);
- Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(null, 10, config).totalCost(), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 10, null).totalCost(), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 0, config).totalCost(), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, -5, config).totalCost(), 0.0);
+ Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config).totalCost(), 0.0);
}
@Test
@@ -68,8 +68,8 @@ public void testScaleDownHasHigherLagCostThanCurrent()
CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3);
- double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig);
- double costScaleDown = costFunction.computeCost(metrics, 5, lagOnlyConfig);
+ double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig).totalCost();
+ double costScaleDown = costFunction.computeCost(metrics, 5, lagOnlyConfig).totalCost();
// Scale down uses absolute model: lag / (5 * rate) = higher recovery time
// Current uses absolute model: lag / (10 * rate) = lower recovery time
@@ -97,15 +97,15 @@ public void testLagCostWithMarginalModel()
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
// Current (10 tasks): uses absolute model = 10M / (10 * 1000) = 1000s
- double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig);
+ double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig).totalCost();
Assert.assertEquals("Cost at current tasks", 1000., costCurrent, 0.1);
// Scale up by 5 (to 15): marginal model = 10M / (15 * 1000) = 666
- double costUp5 = costFunction.computeCost(metrics, 15, lagOnlyConfig);
+ double costUp5 = costFunction.computeCost(metrics, 15, lagOnlyConfig).totalCost();
Assert.assertEquals("Cost when scaling up by 5", 666.7, costUp5, 0.1);
// Scale up by 10 (to 20): marginal model = 10M / (20 * 1000) = 500s
- double costUp10 = costFunction.computeCost(metrics, 20, lagOnlyConfig);
+ double costUp10 = costFunction.computeCost(metrics, 20, lagOnlyConfig).totalCost();
Assert.assertEquals("Cost when scaling up by 10", 500.0, costUp10, 0.01);
// Adding more tasks reduces lag recovery time
@@ -120,8 +120,8 @@ public void testBalancedWeightsFavorStabilityOverScaleUp()
// This is intentional behavior: the algorithm is conservative about scale-up.
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3);
- double costCurrent = costFunction.computeCost(metrics, 10, config);
- double costScaleUp = costFunction.computeCost(metrics, 20, config);
+ double costCurrent = costFunction.computeCost(metrics, 10, config).totalCost();
+ double costScaleUp = costFunction.computeCost(metrics, 20, config).totalCost();
// With balanced weights (0.3 lag, 0.7 idle), the idle cost increase from
// scaling up dominates the lag recovery benefit
@@ -154,8 +154,8 @@ public void testWeightsAffectCost()
CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.1);
- double costLag = costFunction.computeCost(metrics, 10, lagOnly);
- double costIdle = costFunction.computeCost(metrics, 10, idleOnly);
+ double costLag = costFunction.computeCost(metrics, 10, lagOnly).totalCost();
+ double costIdle = costFunction.computeCost(metrics, 10, idleOnly).totalCost();
Assert.assertNotEquals("Different weights should produce different costs", costLag, costIdle, 0.0001);
Assert.assertTrue("Lag-only cost should be positive", costLag > 0.0);
@@ -170,9 +170,9 @@ public void testNoProcessingRateFavorsCurrentTaskCount()
int currentTaskCount = 10;
CostMetrics metricsNoRate = createMetricsWithRate(50000.0, currentTaskCount, 100, 0.3, 0.0);
- double costAtCurrent = costFunction.computeCost(metricsNoRate, currentTaskCount, config);
- double costScaleUp = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, config);
- double costScaleDown = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, config);
+ double costAtCurrent = costFunction.computeCost(metricsNoRate, currentTaskCount, config).totalCost();
+ double costScaleUp = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, config).totalCost();
+ double costScaleDown = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, config).totalCost();
Assert.assertTrue(
"Cost at current should be less than cost for scale up",
@@ -201,8 +201,8 @@ public void testNoProcessingRateDeviationPenaltyIsSymmetric()
.defaultProcessingRate(1000.0)
.build();
- double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, lagOnlyConfig);
- double costDown5 = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, lagOnlyConfig);
+ double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, lagOnlyConfig).totalCost();
+ double costDown5 = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, lagOnlyConfig).totalCost();
Assert.assertEquals(
"Lag cost for +5 and -5 deviation should be equal",
@@ -229,10 +229,10 @@ public void testIdleCostMonotonicWithTaskCount()
// Current: 10 tasks with 40% idle (60% busy)
CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
- double costAt5 = costFunction.computeCost(metrics, 5, idleOnlyConfig);
- double costAt10 = costFunction.computeCost(metrics, 10, idleOnlyConfig);
- double costAt15 = costFunction.computeCost(metrics, 15, idleOnlyConfig);
- double costAt20 = costFunction.computeCost(metrics, 20, idleOnlyConfig);
+ double costAt5 = costFunction.computeCost(metrics, 5, idleOnlyConfig).totalCost();
+ double costAt10 = costFunction.computeCost(metrics, 10, idleOnlyConfig).totalCost();
+ double costAt15 = costFunction.computeCost(metrics, 15, idleOnlyConfig).totalCost();
+ double costAt20 = costFunction.computeCost(metrics, 20, idleOnlyConfig).totalCost();
// Monotonically increasing idle cost as tasks increase
Assert.assertTrue("cost(5) < cost(10)", costAt5 < costAt10);
@@ -256,7 +256,7 @@ public void testIdleRatioClampingAtBoundaries()
// busyFraction = 0.6, taskRatio = 0.2
// predictedIdle = 1 - 0.6/0.2 = 1 - 3 = -2 → clamped to 0
CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
- double costAt2 = costFunction.computeCost(metrics, 2, idleOnlyConfig);
+ double costAt2 = costFunction.computeCost(metrics, 2, idleOnlyConfig).totalCost();
// idlenessCost = taskCount * taskDuration * 0.0 (clamped) = 0
Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped to 0", 0.0, costAt2, 0.0001);
@@ -266,7 +266,7 @@ public void testIdleRatioClampingAtBoundaries()
// busyFraction = 0.9, taskRatio = 10
// predictedIdle = 1 - 0.9/10 = 1 - 0.09 = 0.91 (within bounds)
CostMetrics lowIdle = createMetrics(0.0, 10, 100, 0.1);
- double costAt100 = costFunction.computeCost(lowIdle, 100, idleOnlyConfig);
+ double costAt100 = costFunction.computeCost(lowIdle, 100, idleOnlyConfig).totalCost();
// idlenessCost = 100 * 3600 * 0.91 = 327600
Assert.assertTrue("Cost should be finite and positive", Double.isFinite(costAt100) && costAt100 > 0);
}
@@ -286,8 +286,8 @@ public void testIdleRatioWithMissingData()
// Negative idle ratio indicates missing data → should default to 0.5
CostMetrics missingIdleData = createMetrics(0.0, 10, 100, -1.0);
- double cost10 = costFunction.computeCost(missingIdleData, 10, idleOnlyConfig);
- double cost20 = costFunction.computeCost(missingIdleData, 20, idleOnlyConfig);
+ double cost10 = costFunction.computeCost(missingIdleData, 10, idleOnlyConfig).totalCost();
+ double cost20 = costFunction.computeCost(missingIdleData, 20, idleOnlyConfig).totalCost();
// With missing data, predicted idle = 0.5 for all task counts
// idlenessCost at 10 = 10 * 3600 * 0.5 = 18000
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 30a0d7a723e4..ce6c87e5e082 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -24,6 +24,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import javax.annotation.Nullable;
@@ -86,6 +87,11 @@ default Boolean isHealthy()
return null; // default implementation for interface compatability; returning null since true or false is misleading
}
+ default SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec)
+ {
+ return spec.createAutoscaler(this);
+ }
+
/**
* Resets any stored metadata by the supervisor.
* @param dataSourceMetadata optional dataSource metadata.
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
index c921e2740b87..17cd347231ae 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
@@ -24,4 +24,15 @@ public interface SupervisorTaskAutoScaler
void start();
void stop();
void reset();
+
+ /**
+ * Computes the optimal task count during task rollover, allowing a non-disruptive scale-down.
+ * Must be called by the supervisor when tasks are ending their duration.
+ *
+ * @return optimal task count for scale-down, or -1 if no change needed
+ */
+ default int computeTaskCountForRollover()
+ {
+ return -1;
+ }
}