diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java index c5b86688ea1c..140c4626d220 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java @@ -41,7 +41,7 @@ import org.hamcrest.Matchers; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import org.joda.time.Seconds; +import org.joda.time.Period; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -59,6 +59,7 @@ *

* Tests the autoscaler's ability to compute optimal task counts based on partition count and cost metrics (lag and idle time). */ +@SuppressWarnings("resource") public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase { private static final String TOPIC = EmbeddedClusterApis.createTestDatasourceName(); @@ -95,18 +96,11 @@ public void stop() } }; - // Increase worker capacity to handle more tasks indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s") - .addProperty("druid.worker.capacity", "60"); - - overlord.addProperty("druid.indexer.task.default.context", "{\"useConcurrentLocks\": true}") - .addProperty("druid.manager.segments.useIncrementalCache", "ifSynced") - .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); - - coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced"); + .addProperty("druid.worker.capacity", "100"); cluster.useLatchableEmitter() - .useDefaultTimeoutForLatchableEmitter(120) + .useDefaultTimeoutForLatchableEmitter(60) .addServer(coordinator) .addServer(overlord) .addServer(indexer) @@ -162,7 +156,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown() } @Test - @Timeout(125) public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp() { final String superId = dataSource + "_super_scaleup"; @@ -215,6 +208,63 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp() cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec()); } + @Test + void test_scaleDownDuringTaskRollover() + { + final String superId = dataSource + "_super"; + final int initialTaskCount = 10; + + final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig + .builder() + .enableTaskAutoScaler(true) + .taskCountMin(1) + .taskCountMax(10) + .taskCountStart(initialTaskCount) + .scaleActionPeriodMillis(2000) + .minTriggerScaleActionFrequencyMillis(2000) + // High idle weight ensures scale-down when tasks are mostly idle (little data to process) + .lagWeight(0.1) + .idleWeight(0.9) + .build(); + + final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount); + + // Submit the supervisor + Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec)); + + // Wait for at least one task running for the datasource managed by the supervisor. + overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time") + .hasDimension(DruidMetrics.DATASOURCE, dataSource)); + + // Wait for autoscaler to emit metric indicating scale-down, it should be just less than the current task count. + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC) + .hasValueMatching(Matchers.lessThan((long) initialTaskCount))); + + // Wait for tasks to complete (first rollover) + overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/action/success/count")); + + // Wait for the task running for the datasource managed by a supervisor. + overlord.latchableEmitter().waitForEvent(e -> e.hasMetricName("task/run/time") + .hasDimension(DruidMetrics.DATASOURCE, dataSource)); + + // After rollover, verify that the running task count has decreased + // The autoscaler should have recommended fewer tasks due to high idle time + final int postRolloverRunningTasks = cluster.callApi().getTaskCount("running", dataSource); + + Assertions.assertTrue( + postRolloverRunningTasks < initialTaskCount, + StringUtils.format( + "Expected running task count to decrease after rollover. Initial: %d, After rollover: %d", + initialTaskCount, + postRolloverRunningTasks + ) + ); + + // Suspend the supervisor to clean up + cluster.callApi().postSupervisor(spec.createSuspendedSpec()); + } + private void produceRecordsToKafka(int recordCount, int iterations) { int recordCountPerSlice = recordCount / iterations; @@ -258,7 +308,7 @@ private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler( ioConfig -> ioConfig .withConsumerProperties(kafkaServer.consumerProperties()) .withTaskCount(taskCount) - .withTaskDuration(Seconds.THREE.toPeriod()) + .withTaskDuration(Period.seconds(7)) .withAutoScalerConfig(autoScalerConfig) ) .withId(supervisorId) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 21d2a6265011..0cd799bed547 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -502,7 +502,7 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe SupervisorTaskAutoScaler autoscaler; try { supervisor = spec.createSupervisor(); - autoscaler = spec.createAutoscaler(supervisor); + autoscaler = supervisor.createAutoscaler(spec); supervisor.start(); if (autoscaler != null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 90adc68c799a..c5d3ffe873d5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -61,8 +61,10 @@ import org.apache.druid.indexing.overlord.supervisor.StreamSupervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; @@ -885,7 +887,7 @@ public String getType() /** * Tag for identifying this supervisor in thread-names, listeners, etc. tag = (type + supervisorId). - */ + */ private final String supervisorTag; private final TaskInfoProvider taskInfoProvider; private final RowIngestionMetersFactory rowIngestionMetersFactory; @@ -926,6 +928,12 @@ public String getType() private volatile boolean lifecycleStarted = false; private final ServiceEmitter emitter; + /** + * Reference to the autoscaler, used for rollover-based scale-down decisions. + * Wired by {@link SupervisorManager} after supervisor creation. + */ + private volatile SupervisorTaskAutoScaler taskAutoScaler; + // snapshots latest sequences from the stream to be verified in the next run cycle of inactive stream check private final Map previousSequencesFromStream = new HashMap<>(); private long lastActiveTimeMillis; @@ -1306,7 +1314,7 @@ public void tryInit() if (log.isDebugEnabled()) { log.debug( "Handled notice[%s] from notices queue in [%d] ms, " - + "current notices queue size [%d] for supervisor[%s] for datasource[%s].", + + "current notices queue size [%d] for supervisor[%s] for datasource[%s].", noticeType, noticeHandleTime.millisElapsed(), getNoticesQueueSize(), supervisorId, dataSource ); } @@ -1677,6 +1685,13 @@ private List getCurrentParseErrors() return limitedParseErrors; } + @Override + public SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec) + { + this.taskAutoScaler = spec.createAutoscaler(this); + return this.taskAutoScaler; + } + @VisibleForTesting public TaskGroup addTaskGroupToActivelyReadingTaskGroup( int taskGroupId, @@ -3428,6 +3443,29 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException // remove this task group from the list of current task groups now that it has been handled activelyReadingTaskGroups.remove(groupId); } + + maybeScaleDuringTaskRollover(); + } + + /** + * Scales up or down the number of tasks during a task rollover, if applicable. + *

+ * This method is invoked to determine whether a task count adjustment is needed + * during a task rollover based on the recommendations from the task auto-scaler. + */ + @VisibleForTesting + void maybeScaleDuringTaskRollover() + { + if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) { + int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover(); + if (rolloverTaskCount > 0) { + log.info("Autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount); + changeTaskCountInIOConfig(rolloverTaskCount); + // Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call. + // This seems the best way to inject task amount recalculation during the rollover. + clearAllocationInfo(); + } + } } private DateTime computeEarliestTaskStartTime(TaskGroup group) @@ -4688,9 +4726,9 @@ protected void emitLag() // Try emitting lag even with stale metrics provided that none of the partitions has negative lag final long staleMillis = sequenceLastUpdated == null - ? 0 - : DateTimes.nowUtc().getMillis() - - (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis()); + ? 0 + : DateTimes.nowUtc().getMillis() + - (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis()); if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x < 0)) { // Log at most once every twenty supervisor runs to reduce noise in the logs if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index 9ce08f936602..14e3ca2cf5e8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -33,10 +33,9 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.RowIngestionMeters; -import java.util.ArrayList; -import java.util.List; +import java.util.HashSet; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -58,8 +57,8 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2; private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; - public static final String AVG_LAG_METRIC = "task/autoScaler/costBased/avgLag"; - public static final String AVG_IDLE_METRIC = "task/autoScaler/costBased/pollIdleAvg"; + public static final String LAG_COST_METRIC = "task/autoScaler/costBased/lagCost"; + public static final String IDLE_COST_METRIC = "task/autoScaler/costBased/idleCost"; public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount"; private final String supervisorId; @@ -70,6 +69,7 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler private final ServiceMetricEvent.Builder metricBuilder; private final ScheduledExecutorService autoscalerExecutor; private final WeightedCostFunction costFunction; + private volatile CostMetrics lastKnownMetrics; public CostBasedAutoScaler( SeekableStreamSupervisor supervisor, @@ -86,7 +86,8 @@ public CostBasedAutoScaler( this.costFunction = new WeightedCostFunction(); - this.autoscalerExecutor = Execs.scheduledSingleThreaded("CostBasedAutoScaler-" + StringUtils.encodeForFormat(spec.getId())); + this.autoscalerExecutor = Execs.scheduledSingleThreaded("CostBasedAutoScaler-" + + StringUtils.encodeForFormat(spec.getId())); this.metricBuilder = ServiceMetricEvent.builder() .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) .setDimension( @@ -98,12 +99,8 @@ public CostBasedAutoScaler( @Override public void start() { - Callable scaleAction = () -> computeOptimalTaskCount(this.collectMetrics()); - Runnable onSuccessfulScale = () -> { - }; - autoscalerExecutor.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), + supervisor.buildDynamicAllocationTask(this::computeTaskCountForScaleAction, () -> {}, emitter), config.getScaleActionPeriodMillis(), config.getScaleActionPeriodMillis(), TimeUnit.MILLISECONDS @@ -129,46 +126,25 @@ public void reset() // No-op. } - private CostMetrics collectMetrics() + @Override + public int computeTaskCountForRollover() { - if (spec.isSuspended()) { - log.debug("Supervisor [%s] is suspended, skipping a metrics collection", supervisorId); - return null; - } - - final LagStats lagStats = supervisor.computeLagStats(); - if (lagStats == null) { - log.debug("Lag stats unavailable for supervisorId [%s], skipping collection", supervisorId); - return null; - } - - final int currentTaskCount = supervisor.getIoConfig().getTaskCount(); - final int partitionCount = supervisor.getPartitionCount(); - - final Map> taskStats = supervisor.getStats(); - final double movingAvgRate = extractMovingAverage(taskStats, DropwizardRowIngestionMeters.ONE_MINUTE_NAME); - final double pollIdleRatio = extractPollIdleRatio(taskStats); + return computeOptimalTaskCount(lastKnownMetrics); + } - final double avgPartitionLag = lagStats.getAvgLag(); + public int computeTaskCountForScaleAction() + { + lastKnownMetrics = collectMetrics(); + final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics); + final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount(); - // Use an actual 15-minute moving average processing rate if available - final double avgProcessingRate; - if (movingAvgRate > 0) { - avgProcessingRate = movingAvgRate; - } else { - // Fallback: estimate processing rate based on idle ratio - final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio); - avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio; - } + // Perform only scale-up actions + return optimalTaskCount >= currentTaskCount ? optimalTaskCount : -1; + } - return new CostMetrics( - avgPartitionLag, - currentTaskCount, - partitionCount, - pollIdleRatio, - supervisor.getIoConfig().getTaskDuration().getStandardSeconds(), - avgProcessingRate - ); + public CostBasedAutoScalerConfig getConfig() + { + return config; } /** @@ -184,7 +160,7 @@ private CostMetrics collectMetrics() * * @return optimal task count for scale-up, or -1 if no scaling action needed */ - public int computeOptimalTaskCount(CostMetrics metrics) + int computeOptimalTaskCount(CostMetrics metrics) { if (metrics == null) { log.debug("No metrics available yet for supervisorId [%s]", supervisorId); @@ -204,33 +180,28 @@ public int computeOptimalTaskCount(CostMetrics metrics) return -1; } - // If idle is already in the ideal range [0.2, 0.6], optimal utilization has been achieved. - // No scaling is needed - maintain stability by staying at the current task count. - final double currentIdleRatio = metrics.getPollIdleRatio(); - if (currentIdleRatio >= 0 && WeightedCostFunction.isIdleInIdealRange(currentIdleRatio)) { - log.debug( - "Idle ratio [%.3f] is in ideal range for supervisorId [%s], no scaling needed", - currentIdleRatio, - supervisorId - ); - return -1; - } - int optimalTaskCount = -1; - double optimalCost = Double.POSITIVE_INFINITY; + CostResult optimalCost = new CostResult(); for (int taskCount : validTaskCounts) { - double cost = costFunction.computeCost(metrics, taskCount, config); - log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost); - if (cost < optimalCost) { + CostResult costResult = costFunction.computeCost(metrics, taskCount, config); + double cost = costResult.totalCost(); + log.debug( + "Proposed task count: %d, Cost: %.4f (lag: %.4f, idle: %.4f)", + taskCount, + cost, + costResult.lagCost(), + costResult.idleCost() + ); + if (cost < optimalCost.totalCost()) { optimalTaskCount = taskCount; - optimalCost = cost; + optimalCost = costResult; } } - emitter.emit(metricBuilder.setMetric(AVG_LAG_METRIC, metrics.getAvgPartitionLag())); - emitter.emit(metricBuilder.setMetric(AVG_IDLE_METRIC, metrics.getPollIdleRatio())); emitter.emit(metricBuilder.setMetric(OPTIMAL_TASK_COUNT_METRIC, (long) optimalTaskCount)); + emitter.emit(metricBuilder.setMetric(LAG_COST_METRIC, optimalCost.lagCost())); + emitter.emit(metricBuilder.setMetric(IDLE_COST_METRIC, optimalCost.idleCost())); log.debug( "Cost-based scaling evaluation for supervisorId [%s]: current=%d, optimal=%d, cost=%.4f, " @@ -238,7 +209,7 @@ public int computeOptimalTaskCount(CostMetrics metrics) supervisorId, metrics.getCurrentTaskCount(), optimalTaskCount, - optimalCost, + optimalCost.totalCost(), metrics.getAvgPartitionLag(), metrics.getPollIdleRatio() ); @@ -246,8 +217,8 @@ public int computeOptimalTaskCount(CostMetrics metrics) if (optimalTaskCount == currentTaskCount) { return -1; } - // Perform both scale-up and scale-down proactively - // Future versions may perform scale-down on task rollover only + + // Scale up is performed eagerly. return optimalTaskCount; } @@ -264,26 +235,22 @@ static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount) return new int[]{}; } - List result = new ArrayList<>(); + Set result = new HashSet<>(); final int currentPartitionsPerTask = partitionCount / currentTaskCount; // Minimum partitions per task correspond to the maximum number of tasks (scale up) and vice versa. final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - MAX_INCREASE_IN_PARTITIONS_PER_TASK); - final int maxPartitionsPerTask = Math.min(partitionCount, currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK); + final int maxPartitionsPerTask = Math.min( + partitionCount, + currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK + ); for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= minPartitionsPerTask; partitionsPerTask--) { final int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask; - if (result.isEmpty() || result.get(result.size() - 1) != taskCount) { - result.add(taskCount); - } + result.add(taskCount); } return result.stream().mapToInt(Integer::intValue).toArray(); } - public CostBasedAutoScalerConfig getConfig() - { - return config; - } - /** * Extracts the average poll-idle-ratio metric from task stats. * This metric indicates how much time the consumer spends idle waiting for data. @@ -324,9 +291,9 @@ static double extractPollIdleRatio(Map> taskStats) * * @param taskStats the stats map from supervisor.getStats() * @return the average 15-minute processing rate across all tasks in records/second, - * or -1 if no valid metrics are available + * or -1 if no valid metrics are available */ - static double extractMovingAverage(Map> taskStats, String movingAveragePeriodKey) + static double extractMovingAverage(Map> taskStats) { if (taskStats == null || taskStats.isEmpty()) { return -1; @@ -341,9 +308,15 @@ static double extractMovingAverage(Map> taskStats, S if (movingAveragesObj instanceof Map) { Object buildSegmentsObj = ((Map) movingAveragesObj).get(RowIngestionMeters.BUILD_SEGMENTS); if (buildSegmentsObj instanceof Map) { - Object fifteenMinObj = ((Map) buildSegmentsObj).get(movingAveragePeriodKey); - if (fifteenMinObj instanceof Map) { - Object processedRate = ((Map) fifteenMinObj).get(RowIngestionMeters.PROCESSED); + Object movingAvgObj = ((Map) buildSegmentsObj).get(DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME); + if (movingAvgObj == null) { + movingAvgObj = ((Map) buildSegmentsObj).get(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME); + if (movingAvgObj == null) { + movingAvgObj = ((Map) buildSegmentsObj).get(DropwizardRowIngestionMeters.ONE_MINUTE_NAME); + } + } + if (movingAvgObj instanceof Map) { + Object processedRate = ((Map) movingAvgObj).get(RowIngestionMeters.PROCESSED); if (processedRate instanceof Number) { sum += ((Number) processedRate).doubleValue(); count++; @@ -357,4 +330,47 @@ static double extractMovingAverage(Map> taskStats, S return count > 0 ? sum / count : -1; } + + private CostMetrics collectMetrics() + { + if (spec.isSuspended()) { + log.debug("Supervisor [%s] is suspended, skipping a metrics collection", supervisorId); + return null; + } + + final LagStats lagStats = supervisor.computeLagStats(); + if (lagStats == null) { + log.debug("Lag stats unavailable for supervisorId [%s], skipping collection", supervisorId); + return null; + } + + final int currentTaskCount = supervisor.getIoConfig().getTaskCount(); + final int partitionCount = supervisor.getPartitionCount(); + + final Map> taskStats = supervisor.getStats(); + final double movingAvgRate = extractMovingAverage(taskStats); + final double pollIdleRatio = extractPollIdleRatio(taskStats); + + final double avgPartitionLag = lagStats.getAvgLag(); + + // Use an actual 15-minute moving average processing rate if available + final double avgProcessingRate; + if (movingAvgRate > 0) { + avgProcessingRate = movingAvgRate; + } else { + // Fallback: estimate processing rate based on the idle ratio + final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio); + avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio; + } + + return new CostMetrics( + avgPartitionLag, + currentTaskCount, + partitionCount, + pollIdleRatio, + supervisor.getIoConfig().getTaskDuration().getStandardSeconds(), + avgProcessingRate + ); + } + } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java new file mode 100644 index 000000000000..42096dd61e17 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +/** + * Holds the result of a cost computation from {@link WeightedCostFunction#computeCost}. + * All costs are measured in seconds. + */ +public class CostResult +{ + + private final double totalCost; + private final double lagCost; + private final double idleCost; + + public CostResult() + { + this(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY); + } + + /** + * @param totalCost the weighted sum of lagCost and idleCost + * @param lagCost the weighted cost representing expected time (seconds) to recover current lag + * @param idleCost the weighted cost representing total compute time (seconds) wasted being idle per task duration + */ + public CostResult(double totalCost, double lagCost, double idleCost) + { + this.totalCost = totalCost; + this.lagCost = lagCost; + this.idleCost = idleCost; + } + + public double totalCost() + { + return totalCost; + } + + public double lagCost() + { + return lagCost; + } + + public double idleCost() + { + return idleCost; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java index 1af8233527a5..0da733ef9e71 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java @@ -58,14 +58,15 @@ public static boolean isIdleInIdealRange(double idleRatio) * *

* Formula: {@code lagWeight * lagRecoveryTime + idleWeight * idlenessCost}. - * This approach directly connects costs to operational metricsю + * This approach directly connects costs to operational metrics. * - * @return cost score in seconds, or {@link Double#POSITIVE_INFINITY} for invalid inputs + * @return CostResult containing totalCost, lagCost, and idleCost, + * or result with {@link Double#POSITIVE_INFINITY} for invalid inputs */ - public double computeCost(CostMetrics metrics, int proposedTaskCount, CostBasedAutoScalerConfig config) + public CostResult computeCost(CostMetrics metrics, int proposedTaskCount, CostBasedAutoScalerConfig config) { if (metrics == null || config == null || proposedTaskCount <= 0 || metrics.getPartitionCount() <= 0) { - return Double.POSITIVE_INFINITY; + return new CostResult(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY); } final double avgProcessingRate = metrics.getAvgProcessingRate(); @@ -74,9 +75,9 @@ public double computeCost(CostMetrics metrics, int proposedTaskCount, CostBasedA // Metrics are unavailable - favor maintaining the current task count. // We're conservative about scale up, but won't let an unlikey scale down to happen. if (proposedTaskCount == metrics.getCurrentTaskCount()) { - return 0.01d; + return new CostResult(0.01d, 0.0, 0.0); } else { - return Double.POSITIVE_INFINITY; + return new CostResult(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY); } } else { // Lag recovery time is decreasing by adding tasks and increasing by ejecting tasks. @@ -86,19 +87,21 @@ public double computeCost(CostMetrics metrics, int proposedTaskCount, CostBasedA final double predictedIdleRatio = estimateIdleRatio(metrics, proposedTaskCount); final double idleCost = proposedTaskCount * metrics.getTaskDurationSeconds() * predictedIdleRatio; - final double cost = config.getLagWeight() * lagRecoveryTime + config.getIdleWeight() * idleCost; + final double lagCost = config.getLagWeight() * lagRecoveryTime; + final double weightedIdleCost = config.getIdleWeight() * idleCost; + final double cost = lagCost + weightedIdleCost; log.debug( - "Cost for taskCount[%d]: lagRecoveryTime[%.2fs], idleCost[%.2fs], " + "Cost for taskCount[%d]: lagCost[%.2fs], idleCost[%.2fs], " + "predictedIdle[%.3f], finalCost[%.2fs]", proposedTaskCount, - lagRecoveryTime, - idleCost, + lagCost, + weightedIdleCost, predictedIdleRatio, cost ); - return cost; + return new CostResult(cost, lagCost, weightedIdleCost); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index b61c93af3c8f..7794a798473b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -102,7 +102,9 @@ public void testCreateUpdateAndRemoveSupervisor() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); metadataSupervisorManager.insert("id1", spec); supervisor3.start(); + EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); replayAll(); manager.start(); @@ -115,6 +117,7 @@ public void testCreateUpdateAndRemoveSupervisor() resetAll(); supervisor2.start(); + EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.stop(true); replayAll(); @@ -164,6 +167,7 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of()); metadataSupervisorManager.insert("id1", spec); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); replayAll(); manager.start(); @@ -225,6 +229,7 @@ public void testShouldUpdateSupervisor() ); EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); replayAll(); manager.start(); Assert.assertFalse(manager.shouldUpdateSupervisor(spec)); @@ -249,6 +254,7 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept ); EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); exception.expect(DruidException.class); replayAll(); manager.start(); @@ -332,6 +338,7 @@ public void testGetSupervisorStatus() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.expect(supervisor1.getStatus()).andReturn(report); replayAll(); @@ -352,6 +359,7 @@ public void testHandoffTaskGroupsEarly() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.handoffTaskGroupsEarly(ImmutableList.of(1)); replayAll(); @@ -373,6 +381,7 @@ public void testHandoffTaskGroupsEarlyOnNonStreamSupervisor() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor3.start(); + EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); replayAll(); @@ -414,6 +423,8 @@ public void testStartIndividualSupervisorsFailStart() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor3.start(); + EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.start(); EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor explosion")); replayAll(); @@ -433,7 +444,9 @@ public void testNoPersistOnFailedStart() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap()); metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.stop(true); + EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor2.start(); EasyMock.expectLastCall().andThrow(new RuntimeException("supervisor failed to start")); replayAll(); @@ -461,6 +474,7 @@ public void testStopThrowsException() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.stopAsync(); EasyMock.expectLastCall().andThrow(new RuntimeException("RTE")); replayAll(); @@ -479,6 +493,7 @@ public void testResetSupervisor() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.reset(EasyMock.anyObject(DataSourceMetadata.class)); replayAll(); @@ -498,6 +513,7 @@ public void testResetOnNonStreamSupervisor() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor3.start(); + EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); replayAll(); manager.start(); @@ -533,6 +549,7 @@ public void testResetSupervisorWithSpecificOffsets() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.resetOffsets(datasourceMetadata); replayAll(); @@ -558,7 +575,9 @@ public void testCreateSuspendResumeAndStopSupervisor() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); metadataSupervisorManager.insert("id1", spec); supervisor3.start(); + EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); replayAll(); manager.start(); @@ -574,6 +593,7 @@ public void testCreateSuspendResumeAndStopSupervisor() resetAll(); metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); supervisor2.start(); + EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); supervisor1.stop(true); replayAll(); @@ -589,6 +609,7 @@ public void testCreateSuspendResumeAndStopSupervisor() metadataSupervisorManager.insert(EasyMock.eq("id1"), EasyMock.capture(capturedInsert)); supervisor2.stop(true); supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); replayAll(); manager.suspendOrResumeSupervisor("id1", false); @@ -631,29 +652,29 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); SeekableStreamSupervisorSpec suspendedSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class); - Supervisor suspendedSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + Supervisor suspendedSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class); EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes(); EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes(); EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes(); EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes(); EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes(); EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes(); - EasyMock.replay(suspendedSpec); + EasyMock.replay(suspendedSpec, suspendedSupervisor); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); SeekableStreamSupervisorSpec activeSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class); - Supervisor activeSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + Supervisor activeSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class); EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes(); EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes(); EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes(); EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes(); EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes(); - EasyMock.replay(activeSpec); + EasyMock.replay(activeSpec, activeSupervisor); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class); - Supervisor activeSupervisorWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisor.class); + Supervisor activeSupervisorWithConcurrentLocks = EasyMock.createNiceMock(SeekableStreamSupervisor.class); EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes(); EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources()) @@ -664,11 +685,11 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() .andReturn(null).anyTimes(); EasyMock.expect(activeSpecWithConcurrentLocks.getContext()) .andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true)).anyTimes(); - EasyMock.replay(activeSpecWithConcurrentLocks); + EasyMock.replay(activeSpecWithConcurrentLocks, activeSupervisorWithConcurrentLocks); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class); - Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + Supervisor activeAppendSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class); EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes(); EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes(); @@ -678,12 +699,12 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name() )).anyTimes(); - EasyMock.replay(activeAppendSpec); + EasyMock.replay(activeAppendSpec, activeAppendSupervisor); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); // A supervisor with useConcurrentLocks set to false explicitly must not use an append lock SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class); - Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisor.class); + Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.createNiceMock(SeekableStreamSupervisor.class); EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes(); EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources()) @@ -697,7 +718,7 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name() )).anyTimes(); - EasyMock.replay(specWithUseConcurrentLocksFalse); + EasyMock.replay(specWithUseConcurrentLocksFalse, supervisorWithUseConcurrentLocksFalse); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); replayAll(); @@ -737,7 +758,7 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor() metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); SeekableStreamSupervisorSpec streamingSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class); - SeekableStreamSupervisor streamSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + SeekableStreamSupervisor streamSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class); streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject()); EasyMock.expectLastCall().once(); EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes(); @@ -746,7 +767,7 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor() EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes(); EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes(); EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes(); - EasyMock.replay(streamingSpec); + EasyMock.replay(streamingSpec, streamSupervisor); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); EasyMock.expectLastCall().once(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 58ffb8438e0d..f0b15f8c8a09 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; @@ -62,6 +63,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.metadata.TestSupervisorSpec; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -678,6 +680,103 @@ public void testAutoScalerCreated() Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler); } + @Test + public void testAutoScalerReturnsNoopWhenSupervisorIsNotSeekableStreamSupervisor() + { + // Test the branch where supervisor instanceof SeekableStreamSupervisor is false + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("taskCountMax", 8); + autoScalerConfig.put("taskCountMin", 1); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes(); + EasyMock.replay(seekableStreamSupervisorIOConfig); + + EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes(); + EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.replay(supervisor4); + + TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); + + // Create a non-SeekableStreamSupervisor mock + Supervisor nonSeekableStreamSupervisor = EasyMock.mock(Supervisor.class); + EasyMock.replay(nonSeekableStreamSupervisor); + + // When passing a non-SeekableStreamSupervisor, should return NoopTaskAutoScaler + SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(nonSeekableStreamSupervisor); + Assert.assertTrue( + "Should return NoopTaskAutoScaler when supervisor is not SeekableStreamSupervisor", + autoscaler instanceof NoopTaskAutoScaler + ); + } + + @Test + public void testAutoScalerReturnsNoopWhenConfigIsNull() + { + // Test the branch where autoScalerConfig is null + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) + .andReturn(null) + .anyTimes(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes(); + EasyMock.replay(seekableStreamSupervisorIOConfig); + + EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes(); + EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.replay(supervisor4); + + TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); + + // When autoScalerConfig is null, should return NoopTaskAutoScaler + SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4); + Assert.assertTrue( + "Should return NoopTaskAutoScaler when autoScalerConfig is null", + autoscaler instanceof NoopTaskAutoScaler + ); + } + @Test public void testDefaultAutoScalerConfigCreatedWithDefault() { @@ -857,71 +956,6 @@ public int getActiveTaskGroupsCount() autoScaler.stop(); } - @Test - public void test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing() throws InterruptedException - { - EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); - EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); - EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); - EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes(); - EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); - EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); - EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); - EasyMock.replay(spec); - - EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); - EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); - EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); - EasyMock.replay(ingestionSchema); - - EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); - EasyMock.replay(taskMaster); - - StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter(); - TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); - - LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( - supervisor, - DATASOURCE, - mapper.convertValue( - getScaleOutProperties(2), - LagBasedAutoScalerConfig.class - ), - spec, - dynamicActionEmitter - ); - - supervisor.addTaskGroupToPendingCompletionTaskGroup( - 0, - ImmutableMap.of("0", "0"), - null, - null, - Set.of("dummyTask"), - Collections.emptySet() - ); - - supervisor.start(); - autoScaler.start(); - - supervisor.runInternal(); - Thread.sleep(1000); // ensure a dynamic allocation notice completes - - Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue()); - Assert.assertTrue( - dynamicActionEmitter - .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) - .stream() - .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) - .filter(Objects::nonNull) - .anyMatch("There are tasks pending completion"::equals) - ); - - emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC); - autoScaler.reset(); - autoScaler.stop(); - } - @Test public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor() throws InterruptedException { @@ -1593,6 +1627,175 @@ public String getSource() originalSpec.validateSpecUpdateTo(proposedSpecSameSource); } + @Test + public void test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing() throws InterruptedException + { + EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter(); + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + getScaleOutProperties(2), + LagBasedAutoScalerConfig.class + ), + spec, + dynamicActionEmitter + ); + + supervisor.addTaskGroupToPendingCompletionTaskGroup( + 0, + ImmutableMap.of("0", "0"), + null, + null, + Set.of("dummyTask"), + Collections.emptySet() + ); + + supervisor.start(); + autoScaler.start(); + + supervisor.runInternal(); + Thread.sleep(1000); // ensure a dynamic allocation notice completes + + Assert.assertEquals(1, supervisor.getIoConfig().getTaskCount().intValue()); + Assert.assertTrue( + dynamicActionEmitter + .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) + .stream() + .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) + .filter(Objects::nonNull) + .anyMatch("There are tasks pending completion"::equals) + ); + + emitter.verifyNotEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC); + autoScaler.reset(); + autoScaler.stop(); + } + + @Test + public void test_dynamicAllocationNotice_skips_whenSupervisorSuspended() throws InterruptedException + { + EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + // Suspended → DynamicAllocationTasksNotice should return early and not scale + EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + getScaleOutProperties(2), + LagBasedAutoScalerConfig.class + ), + spec, + emitter + ); + + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + + int before = supervisor.getIoConfig().getTaskCount(); + Thread.sleep(1000); + int after = supervisor.getIoConfig().getTaskCount(); + // No scaling expected because supervisor is suspended + Assert.assertEquals(before, after); + + autoScaler.reset(); + autoScaler.stop(); + } + + @Test + public void test_changeTaskCountInIOConfig_handlesExceptionAndStillUpdatesTaskCount() throws InterruptedException + { + EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + // SupervisorManager present but metadata insert fails → should be handled + SupervisorManager sm = EasyMock.createMock(SupervisorManager.class); + MetadataSupervisorManager msm = EasyMock.createMock(MetadataSupervisorManager.class); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(sm)).anyTimes(); + EasyMock.expect(sm.getMetadataSupervisorManager()).andReturn(msm).anyTimes(); + msm.insert(EasyMock.anyString(), EasyMock.anyObject()); + EasyMock.expectLastCall().andThrow(new RuntimeException("boom")).anyTimes(); + EasyMock.replay(taskMaster, sm, msm); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + getScaleOutProperties(2), + LagBasedAutoScalerConfig.class + ), + spec, + emitter + ); + + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + + int before = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, before); + Thread.sleep(1000); // allow one dynamic allocation cycle + int after = supervisor.getIoConfig().getTaskCount(); + // Even though metadata insert failed, taskCount should still be updated in ioConfig + Assert.assertEquals(2, after); + + autoScaler.reset(); + autoScaler.stop(); + } + @Test public void testMergeSpecConfigs() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java new file mode 100644 index 000000000000..3e4bcd92b99f --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.granularity.UniformGranularitySpec; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.incremental.RowIngestionMetersFactory; +import org.apache.druid.segment.indexing.DataSchema; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.File; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ScheduledExecutorService; + +public class SeekableStreamSupervisorScaleDuringTaskRolloverTest extends EasyMockSupport +{ + private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); + private static final String STREAM = "stream"; + private static final String DATASOURCE = "testDS"; + private static final String SUPERVISOR = "supervisor"; + private static final int DEFAULT_TASK_COUNT = 10; + + private TaskStorage taskStorage; + private TaskMaster taskMaster; + private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + private ServiceEmitter emitter; + private RowIngestionMetersFactory rowIngestionMetersFactory; + private SeekableStreamIndexTaskClientFactory taskClientFactory; + private SeekableStreamSupervisorSpec spec; + private SupervisorStateManagerConfig supervisorConfig; + + @Before + public void setUp() + { + taskStorage = EasyMock.mock(TaskStorage.class); + taskMaster = EasyMock.mock(TaskMaster.class); + indexerMetadataStorageCoordinator = EasyMock.mock(IndexerMetadataStorageCoordinator.class); + emitter = new StubServiceEmitter(); + rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class); + taskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class); + spec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + supervisorConfig = new SupervisorStateManagerConfig(); + + // Common taskMaster setup - used by all tests + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + } + + @Test + public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() + { + // Given + setupSpecExpectations(getIOConfigWithoutAutoScaler(5)); + EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(spec); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); + supervisor.start(); + + int beforeTaskCount = supervisor.getIoConfig().getTaskCount(); + + // When + supervisor.maybeScaleDuringTaskRollover(); + + // Then + Assert.assertEquals( + "Task count should not change when taskAutoScaler is null", + beforeTaskCount, + (int) supervisor.getIoConfig().getTaskCount() + ); + } + + @Test + public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale() + { + // Given + setupSpecExpectations(getIOConfigWithCostBasedAutoScaler()); + EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject())) + .andReturn(createMockAutoScaler(-1)) + .anyTimes(); + EasyMock.replay(spec); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(100); + supervisor.start(); + supervisor.createAutoscaler(spec); + + int beforeTaskCount = supervisor.getIoConfig().getTaskCount(); + + // When + supervisor.maybeScaleDuringTaskRollover(); + + // Then + Assert.assertEquals( + "Task count should not change when rolloverTaskCount <= 0", + beforeTaskCount, + (int) supervisor.getIoConfig().getTaskCount() + ); + } + + @Test + public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling() + { + // Given + final int targetTaskCount = 5; + + setupSpecExpectations(getIOConfigWithCostBasedAutoScaler()); + EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject())) + .andReturn(createMockAutoScaler(targetTaskCount)) + .anyTimes(); + EasyMock.replay(spec); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(100); + supervisor.start(); + supervisor.createAutoscaler(spec); + + Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); + + // When + supervisor.maybeScaleDuringTaskRollover(); + + // Then + Assert.assertEquals( + "Task count should be updated to " + targetTaskCount + " when rolloverTaskCount > 0", + targetTaskCount, + (int) supervisor.getIoConfig().getTaskCount() + ); + } + + @Test + public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() + { + // Given + setupSpecExpectations(getIOConfigWithCostBasedAutoScaler()); + EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject())) + .andReturn(createMockAutoScaler(0)) + .anyTimes(); + EasyMock.replay(spec); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(100); + supervisor.start(); + supervisor.createAutoscaler(spec); + + int beforeTaskCount = supervisor.getIoConfig().getTaskCount(); + + // When + supervisor.maybeScaleDuringTaskRollover(); + + // Then + Assert.assertEquals( + "Task count should not change when rolloverTaskCount is 0", + beforeTaskCount, + (int) supervisor.getIoConfig().getTaskCount() + ); + } + + // Helper methods for test setup + + /** + * Sets up common spec expectations. Call EasyMock.replay(spec) after this and any additional expectations. + */ + private void setupSpecExpectations(SeekableStreamSupervisorIOConfig ioConfig) + { + EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + } + + /** + * Creates a mock autoscaler that returns the specified rollover count. + */ + private static SupervisorTaskAutoScaler createMockAutoScaler(int rolloverCount) + { + return new SupervisorTaskAutoScaler() + { + @Override + public void start() + { + } + + @Override + public void stop() + { + } + + @Override + public void reset() + { + } + + @Override + public int computeTaskCountForRollover() + { + return rolloverCount; + } + }; + } + + // Helper methods for config creation + + private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig() + { + return CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.3) + .idleWeight(0.7) + .scaleActionPeriodMillis(100) + .build(); + } + + private SeekableStreamSupervisorIOConfig getIOConfigWithCostBasedAutoScaler() + { + return createIOConfig(DEFAULT_TASK_COUNT, getCostBasedAutoScalerConfig()); + } + + private SeekableStreamSupervisorIOConfig getIOConfigWithoutAutoScaler(int taskCount) + { + return createIOConfig(taskCount, null); + } + + private SeekableStreamSupervisorIOConfig createIOConfig(int taskCount, CostBasedAutoScalerConfig autoScalerConfig) + { + return new SeekableStreamSupervisorIOConfig( + STREAM, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), + 1, + taskCount, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + autoScalerConfig, + LagAggregator.DEFAULT, + null, + null, + null + ) + { + }; + } + + private static DataSchema getDataSchema() + { + List dimensions = new ArrayList<>(); + dimensions.add(StringDimensionSchema.create("dim1")); + dimensions.add(StringDimensionSchema.create("dim2")); + + return DataSchema.builder() + .withDataSource(DATASOURCE) + .withTimestamp(new TimestampSpec("timestamp", "iso", null)) + .withDimensions(dimensions) + .withAggregators(new CountAggregatorFactory("rows")) + .withGranularity( + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + ImmutableList.of() + ) + ) + .build(); + } + + private static SeekableStreamSupervisorTuningConfig getTuningConfig() + { + return new SeekableStreamSupervisorTuningConfig() + { + @Override + public Integer getWorkerThreads() + { + return 1; + } + + @Override + public Long getChatRetries() + { + return 1L; + } + + @Override + public Duration getHttpTimeout() + { + return new Period("PT1M").toStandardDuration(); + } + + @Override + public Duration getShutdownTimeout() + { + return new Period("PT1S").toStandardDuration(); + } + + @Override + public Duration getRepartitionTransitionDuration() + { + return new Period("PT2M").toStandardDuration(); + } + + @Override + public Duration getOffsetFetchPeriod() + { + return new Period("PT5M").toStandardDuration(); + } + + @Override + public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() + { + return new SeekableStreamIndexTaskTuningConfig(null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, null, null, + null, null, null + ) + { + @Override + public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) + { + return null; + } + + @Override + public String toString() + { + return null; + } + }; + } + }; + } + + // Inner test classes + + private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor + { + private BaseTestSeekableStreamSupervisor() + { + super( + "testSupervisorId", + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + spec, + rowIngestionMetersFactory, + false + ); + } + + @Override + protected String baseTaskName() + { + return "test"; + } + + @Override + protected void updatePartitionLagFromStream() + { + } + + @Nullable + @Override + protected Map getPartitionRecordLag() + { + return null; + } + + @Nullable + @Override + protected Map getPartitionTimeLag() + { + return null; + } + + @Override + protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( + int groupId, + Map startPartitions, + Map endPartitions, + String baseSequenceName, + DateTime minimumMessageTime, + DateTime maximumMessageTime, + Set exclusiveStartSequenceNumberPartitions, + SeekableStreamSupervisorIOConfig ioConfig + ) + { + return new SeekableStreamIndexTaskIOConfig<>( + groupId, + baseSequenceName, + new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions), + new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions), + true, + minimumMessageTime, + maximumMessageTime, + ioConfig.getInputFormat(), + ioConfig.getTaskDuration().getStandardMinutes() + ) + { + }; + } + + @Override + protected List> createIndexTasks( + int replicas, + String baseSequenceName, + ObjectMapper sortingMapper, + TreeMap> sequenceOffsets, + SeekableStreamIndexTaskIOConfig taskIoConfig, + SeekableStreamIndexTaskTuningConfig taskTuningConfig, + RowIngestionMetersFactory rowIngestionMetersFactory + ) + { + return null; + } + + @Override + protected int getTaskGroupIdForPartition(String partition) + { + return 0; + } + + @Override + protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata) + { + return true; + } + + @Override + protected boolean doesTaskMatchSupervisor(Task task) + { + return true; + } + + @Override + protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + String stream, + Map map + ) + { + return null; + } + + @Override + protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean isExclusive) + { + return new OrderedSequenceNumber<>(seq, isExclusive) + { + @Override + public int compareTo(OrderedSequenceNumber o) + { + return new BigInteger(this.get()).compareTo(new BigInteger(o.get())); + } + }; + } + + @Override + protected Map getRecordLagPerPartition(Map currentOffsets) + { + return null; + } + + @Override + protected Map getTimeLagPerPartition(Map currentOffsets) + { + return null; + } + + @Override + protected RecordSupplier setupRecordSupplier() + { + return recordSupplier; + } + + @Override + protected SeekableStreamSupervisorReportPayload createReportPayload( + int numPartitions, + boolean includeOffsets + ) + { + return new SeekableStreamSupervisorReportPayload<>(SUPERVISOR, DATASOURCE, STREAM, 1, 1, 1L, + null, null, null, null, null, null, + false, true, null, null, null + ) + { + }; + } + + @Override + protected String getNotSetMarker() + { + return "NOT_SET"; + } + + @Override + protected String getEndOfPartitionMarker() + { + return "EOF"; + } + + @Override + protected boolean isEndOfShard(String seqNum) + { + return false; + } + + @Override + protected boolean isShardExpirationMarker(String seqNum) + { + return false; + } + + @Override + protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() + { + return false; + } + } + + private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor + { + private final int partitionNumbers; + + public TestSeekableStreamSupervisor(int partitionNumbers) + { + this.partitionNumbers = partitionNumbers; + } + + @Override + protected void scheduleReporting(ScheduledExecutorService reportingExec) + { + } + + @Override + public LagStats computeLagStats() + { + return new LagStats(0, 0, 0); + } + + @Override + public int getPartitionCount() + { + return partitionNumbers; + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 5df9edd184d2..71e798e7e2fa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2751,6 +2751,28 @@ public void testMaxAllowedStopsWithStopTaskCountRatio() Assert.assertEquals(12, config.getMaxAllowedStops()); } + @Test + public void testCreateAutoscalerStoresAndReturnsAutoscaler() + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes(); + + replayAll(); + + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // Test that createAutoscaler returns null when spec returns null + SeekableStreamSupervisorSpec mockSpec = EasyMock.createMock(SeekableStreamSupervisorSpec.class); + EasyMock.expect(mockSpec.createAutoscaler(supervisor)).andReturn(null).once(); + EasyMock.replay(mockSpec); + + Assert.assertNull(supervisor.createAutoscaler(mockSpec)); + EasyMock.verify(mockSpec); + + verifyAll(); + } + private static DataSchema getDataSchema() { List dimensions = new ArrayList<>(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java index 44bbef9c6a15..caf5453f5217 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; -import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; @@ -35,6 +34,9 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME; +import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME; +import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME; import static org.mockito.Mockito.when; public class CostBasedAutoScalerTest @@ -69,12 +71,23 @@ public void testComputeValidTaskCounts() { // For 100 partitions at 25 tasks (4 partitions/task), valid counts include 25 and 34 int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100, 25); + Assert.assertTrue("Should contain the current task count", contains(validTaskCounts, 25)); + Assert.assertTrue("Should contain the next scale-up option", contains(validTaskCounts, 34)); - Assert.assertTrue("Should contain current task count", contains(validTaskCounts, 25)); - Assert.assertTrue("Should contain next scale-up option", contains(validTaskCounts, 34)); + // Edge cases + Assert.assertEquals("Zero partitions return empty array", 0, CostBasedAutoScaler.computeValidTaskCounts(0, 10).length); + Assert.assertEquals("Negative partitions return empty array", 0, CostBasedAutoScaler.computeValidTaskCounts(-5, 10).length); - // Edge case: zero partitions returns empty array - Assert.assertEquals(0, CostBasedAutoScaler.computeValidTaskCounts(0, 10).length); + // Single partition + int[] singlePartition = CostBasedAutoScaler.computeValidTaskCounts(1, 1); + Assert.assertTrue("Single partition should have at least one valid count", singlePartition.length > 0); + Assert.assertTrue("Single partition should contain 1", contains(singlePartition, 1)); + + // Current exceeds partitions - should still yield valid, deduplicated options + int[] exceedsPartitions = CostBasedAutoScaler.computeValidTaskCounts(2, 5); + Assert.assertEquals(2, exceedsPartitions.length); + Assert.assertTrue(contains(exceedsPartitions, 1)); + Assert.assertTrue(contains(exceedsPartitions, 2)); } @Test @@ -82,42 +95,24 @@ public void testComputeOptimalTaskCountInvalidInputs() { Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(null)); Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(0.0, 10, 0, 0.0))); - } - - @Test - public void testComputeOptimalTaskCountIdleInIdealRange() - { - // When idle is in ideal range [0.2, 0.6], no scaling should occur - Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(5000.0, 25, 100, 0.4))); + Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3))); + Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3))); } @Test public void testComputeOptimalTaskCountScaling() { // High idle (underutilized) - should scale down - // With high idle (0.8), the algorithm evaluates lower task counts and finds they have lower idle cost int scaleDownResult = autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8)); Assert.assertTrue("Should scale down when idle > 0.6", scaleDownResult < 25); - } - @Test - public void testComputeOptimalTaskCountLowIdleDoesNotScaleUpWithBalancedWeights() - { - // With corrected idle ratio model and marginal lag model, low idle does not - // automatically trigger scale-up. The algorithm is conservative because: - // 1. Scale-up increases idle cost (more tasks = more idle per task with fixed load) - // 2. Marginal lag model means only ADDITIONAL tasks work on backlog - // - // This is intentional: the idle-heavy weights (0.4 idle) make the algorithm - // favor stability over aggressive scaling - int result = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1)); - - // Algorithm evaluates costs and may find current count optimal - // or may scale down if idle cost reduction outweighs lag increase - Assert.assertTrue( - "With low idle and balanced weights, algorithm should not scale up aggressively", - result == -1 || result <= 25 - ); + // Very high idle with high task count - should scale down + int highIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9)); + Assert.assertTrue("Scale down scenario should return optimal <= current", highIdleResult <= 50); + + // With low idle and balanced weights, algorithm should not scale up aggressively + int lowIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1)); + Assert.assertTrue("With low idle and balanced weights, should not scale up aggressively", lowIdleResult <= 25); } @Test @@ -142,35 +137,219 @@ public void testExtractPollIdleRatio() } @Test - public void testExtractProcessingRateMovingAverage() + public void testExtractPollIdleRatioInvalidTypes() + { + // Non-map task metric + Map> nonMapTask = new HashMap<>(); + nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map")); + Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(nonMapTask), 0.0001); + + // Empty autoscaler metrics + Map> emptyAutoscaler = new HashMap<>(); + Map taskStats1 = new HashMap<>(); + taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new HashMap<>()); + emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1)); + Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler), 0.0001); + + // Non-map autoscaler metrics + Map> nonMapAutoscaler = new HashMap<>(); + Map taskStats2 = new HashMap<>(); + taskStats2.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, "not-a-map"); + nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2)); + Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler), 0.0001); + + // Non-number poll idle ratio + Map> nonNumberRatio = new HashMap<>(); + Map taskStats3 = new HashMap<>(); + Map autoscalerMetrics = new HashMap<>(); + autoscalerMetrics.put(SeekableStreamIndexTaskRunner.POLL_IDLE_RATIO_KEY, "not-a-number"); + taskStats3.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, autoscalerMetrics); + nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3)); + Assert.assertEquals(0., CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio), 0.0001); + } + + @Test + public void testExtractMovingAverage() { // Null and empty return -1 - Assert.assertEquals( - -1., - CostBasedAutoScaler.extractMovingAverage(null, DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), - 0.0001 - ); - Assert.assertEquals( - -1., - CostBasedAutoScaler.extractMovingAverage( - Collections.emptyMap(), - DropwizardRowIngestionMeters.FIVE_MINUTE_NAME - ), - 0.0001 - ); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(null), 0.0001); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(Collections.emptyMap()), 0.0001); // Missing metrics return -1 Map> missingMetrics = new HashMap<>(); missingMetrics.put("0", Collections.singletonMap("task-0", new HashMap<>())); - Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(missingMetrics, DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), 0.0001); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(missingMetrics), 0.0001); - // Valid stats return average + // Valid stats return average (using 5-minute) Map> validStats = new HashMap<>(); Map group = new HashMap<>(); group.put("task-0", buildTaskStatsWithMovingAverage(1000.0)); group.put("task-1", buildTaskStatsWithMovingAverage(2000.0)); validStats.put("0", group); - Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(validStats, DropwizardRowIngestionMeters.FIVE_MINUTE_NAME), 0.0001); + Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(validStats), 0.0001); + } + + @Test + public void testExtractMovingAverageIntervalFallback() + { + // 15-minute average is preferred + Map> fifteenMin = new HashMap<>(); + fifteenMin.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0))); + Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001); + + // 1-minute as final fallback + Map> oneMin = new HashMap<>(); + oneMin.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0))); + Assert.assertEquals(500.0, CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001); + + // 15-minute preferred over 5-minute when both available + Map> allIntervals = new HashMap<>(); + allIntervals.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMultipleMovingAverages(1500.0, 1000.0, 500.0))); + Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(allIntervals), 0.0001); + + // Falls back to 5-minute when 15-minute is null + Map> nullFifteen = new HashMap<>(); + nullFifteen.put("0", Collections.singletonMap("task-0", buildTaskStatsWithNullInterval(FIFTEEN_MINUTE_NAME, FIVE_MINUTE_NAME, 750.0))); + Assert.assertEquals(750.0, CostBasedAutoScaler.extractMovingAverage(nullFifteen), 0.0001); + + // Falls back to 1-minute when both 15 and 5 are null + Map> bothNull = new HashMap<>(); + bothNull.put("0", Collections.singletonMap("task-0", buildTaskStatsWithTwoNullIntervals(250.0))); + Assert.assertEquals(250.0, CostBasedAutoScaler.extractMovingAverage(bothNull), 0.0001); + } + + @Test + public void testExtractMovingAverageInvalidTypes() + { + // Non-map task metric + Map> nonMapTask = new HashMap<>(); + nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map")); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonMapTask), 0.0001); + + // Missing buildSegments + Map> missingBuild = new HashMap<>(); + Map taskStats1 = new HashMap<>(); + taskStats1.put("movingAverages", new HashMap<>()); + missingBuild.put("0", Collections.singletonMap("task-0", taskStats1)); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(missingBuild), 0.0001); + + // Non-map movingAverages + Map> nonMapMA = new HashMap<>(); + Map taskStats2 = new HashMap<>(); + taskStats2.put("movingAverages", "not-a-map"); + nonMapMA.put("0", Collections.singletonMap("task-0", taskStats2)); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonMapMA), 0.0001); + + // Non-map buildSegments + Map> nonMapBS = new HashMap<>(); + Map taskStats3 = new HashMap<>(); + Map movingAverages3 = new HashMap<>(); + movingAverages3.put(RowIngestionMeters.BUILD_SEGMENTS, "not-a-map"); + taskStats3.put("movingAverages", movingAverages3); + nonMapBS.put("0", Collections.singletonMap("task-0", taskStats3)); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonMapBS), 0.0001); + + // Non-map interval data + Map> nonMapInterval = new HashMap<>(); + Map taskStats4 = new HashMap<>(); + Map movingAverages4 = new HashMap<>(); + Map buildSegments4 = new HashMap<>(); + buildSegments4.put(FIFTEEN_MINUTE_NAME, "not-a-map"); + movingAverages4.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments4); + taskStats4.put("movingAverages", movingAverages4); + nonMapInterval.put("0", Collections.singletonMap("task-0", taskStats4)); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonMapInterval), 0.0001); + + // Non-number processed rate + Map> nonNumberRate = new HashMap<>(); + Map taskStats5 = new HashMap<>(); + Map movingAverages5 = new HashMap<>(); + Map buildSegments5 = new HashMap<>(); + Map fifteenMin = new HashMap<>(); + fifteenMin.put(RowIngestionMeters.PROCESSED, "not-a-number"); + buildSegments5.put(FIFTEEN_MINUTE_NAME, fifteenMin); + movingAverages5.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments5); + taskStats5.put("movingAverages", movingAverages5); + nonNumberRate.put("0", Collections.singletonMap("task-0", taskStats5)); + Assert.assertEquals(-1., CostBasedAutoScaler.extractMovingAverage(nonNumberRate), 0.0001); + } + + @Test + public void testComputeTaskCountForRolloverReturnsMinusOneWhenSuspended() + { + SupervisorSpec spec = Mockito.mock(SupervisorSpec.class); + SeekableStreamSupervisor supervisor = Mockito.mock(SeekableStreamSupervisor.class); + ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class); + SeekableStreamSupervisorIOConfig ioConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class); + + when(spec.getId()).thenReturn("s-up"); + when(spec.isSuspended()).thenReturn(true); + when(supervisor.getIoConfig()).thenReturn(ioConfig); + when(ioConfig.getStream()).thenReturn("stream"); + + CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder() + .taskCountMax(10) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.5) + .idleWeight(0.5) + .build(); + + CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, spec, emitter); + Assert.assertEquals(-1, scaler.computeTaskCountForRollover()); + } + + @Test + public void testComputeTaskCountForRolloverReturnsMinusOneWhenLagStatsNull() + { + SupervisorSpec spec = Mockito.mock(SupervisorSpec.class); + SeekableStreamSupervisor supervisor = Mockito.mock(SeekableStreamSupervisor.class); + ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class); + SeekableStreamSupervisorIOConfig ioConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class); + + when(spec.getId()).thenReturn("s-up"); + when(spec.isSuspended()).thenReturn(false); + when(supervisor.computeLagStats()).thenReturn(null); + when(supervisor.getIoConfig()).thenReturn(ioConfig); + when(ioConfig.getStream()).thenReturn("stream"); + + CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder() + .taskCountMax(10) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.5) + .idleWeight(0.5) + .build(); + + CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, spec, emitter); + Assert.assertEquals(-1, scaler.computeTaskCountForRollover()); + } + + @Test + public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics() + { + // Tests the case where lastKnownMetrics is null (no computeTaskCountForScaleAction called) + SupervisorSpec spec = Mockito.mock(SupervisorSpec.class); + SeekableStreamSupervisor supervisor = Mockito.mock(SeekableStreamSupervisor.class); + ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class); + SeekableStreamSupervisorIOConfig ioConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class); + + when(spec.getId()).thenReturn("s-up"); + when(spec.isSuspended()).thenReturn(false); + when(supervisor.getIoConfig()).thenReturn(ioConfig); + when(ioConfig.getStream()).thenReturn("stream"); + + CostBasedAutoScalerConfig cfg = CostBasedAutoScalerConfig.builder() + .taskCountMax(10) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.5) + .idleWeight(0.5) + .build(); + + CostBasedAutoScaler scaler = new CostBasedAutoScaler(supervisor, cfg, spec, emitter); + // Should return -1 when lastKnownMetrics is null + Assert.assertEquals(-1, scaler.computeTaskCountForRollover()); } private CostMetrics createMetrics( @@ -213,7 +392,68 @@ private Map buildTaskStatsWithPollIdle(double pollIdleRatio) private Map buildTaskStatsWithMovingAverage(double processedRate) { Map buildSegments = new HashMap<>(); - buildSegments.put(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, processedRate)); + buildSegments.put(FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, processedRate)); + + Map movingAverages = new HashMap<>(); + movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments); + + Map taskStats = new HashMap<>(); + taskStats.put("movingAverages", movingAverages); + return taskStats; + } + + private Map buildTaskStatsWithMovingAverageForInterval(String intervalName, double processedRate) + { + Map buildSegments = new HashMap<>(); + buildSegments.put(intervalName, Map.of(RowIngestionMeters.PROCESSED, processedRate)); + + Map movingAverages = new HashMap<>(); + movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments); + + Map taskStats = new HashMap<>(); + taskStats.put("movingAverages", movingAverages); + return taskStats; + } + + private Map buildTaskStatsWithMultipleMovingAverages( + double fifteenMinRate, + double fiveMinRate, + double oneMinRate + ) + { + Map buildSegments = new HashMap<>(); + buildSegments.put(FIFTEEN_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, fifteenMinRate)); + buildSegments.put(FIVE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, fiveMinRate)); + buildSegments.put(ONE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, oneMinRate)); + + Map movingAverages = new HashMap<>(); + movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments); + + Map taskStats = new HashMap<>(); + taskStats.put("movingAverages", movingAverages); + return taskStats; + } + + private Map buildTaskStatsWithNullInterval(String nullInterval, String validInterval, double processedRate) + { + Map buildSegments = new HashMap<>(); + buildSegments.put(nullInterval, null); + buildSegments.put(validInterval, Map.of(RowIngestionMeters.PROCESSED, processedRate)); + + Map movingAverages = new HashMap<>(); + movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments); + + Map taskStats = new HashMap<>(); + taskStats.put("movingAverages", movingAverages); + return taskStats; + } + + private Map buildTaskStatsWithTwoNullIntervals(double oneMinRate) + { + Map buildSegments = new HashMap<>(); + buildSegments.put(FIFTEEN_MINUTE_NAME, null); + buildSegments.put(FIVE_MINUTE_NAME, null); + buildSegments.put(ONE_MINUTE_NAME, Map.of(RowIngestionMeters.PROCESSED, oneMinRate)); Map movingAverages = new HashMap<>(); movingAverages.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegments); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java index 0c5602052d4d..90b50477c924 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java @@ -47,11 +47,11 @@ public void testComputeCostInvalidInputs() { CostMetrics validMetrics = createMetrics(100000.0, 10, 100, 0.3); - Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(null, 10, config), 0.0); - Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 10, null), 0.0); - Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 0, config), 0.0); - Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, -5, config), 0.0); - Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config), 0.0); + Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(null, 10, config).totalCost(), 0.0); + Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 10, null).totalCost(), 0.0); + Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 0, config).totalCost(), 0.0); + Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, -5, config).totalCost(), 0.0); + Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config).totalCost(), 0.0); } @Test @@ -68,8 +68,8 @@ public void testScaleDownHasHigherLagCostThanCurrent() CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3); - double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig); - double costScaleDown = costFunction.computeCost(metrics, 5, lagOnlyConfig); + double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig).totalCost(); + double costScaleDown = costFunction.computeCost(metrics, 5, lagOnlyConfig).totalCost(); // Scale down uses absolute model: lag / (5 * rate) = higher recovery time // Current uses absolute model: lag / (10 * rate) = lower recovery time @@ -97,15 +97,15 @@ public void testLagCostWithMarginalModel() CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3); // Current (10 tasks): uses absolute model = 10M / (10 * 1000) = 1000s - double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig); + double costCurrent = costFunction.computeCost(metrics, 10, lagOnlyConfig).totalCost(); Assert.assertEquals("Cost at current tasks", 1000., costCurrent, 0.1); // Scale up by 5 (to 15): marginal model = 10M / (15 * 1000) = 666 - double costUp5 = costFunction.computeCost(metrics, 15, lagOnlyConfig); + double costUp5 = costFunction.computeCost(metrics, 15, lagOnlyConfig).totalCost(); Assert.assertEquals("Cost when scaling up by 5", 666.7, costUp5, 0.1); // Scale up by 10 (to 20): marginal model = 10M / (20 * 1000) = 500s - double costUp10 = costFunction.computeCost(metrics, 20, lagOnlyConfig); + double costUp10 = costFunction.computeCost(metrics, 20, lagOnlyConfig).totalCost(); Assert.assertEquals("Cost when scaling up by 10", 500.0, costUp10, 0.01); // Adding more tasks reduces lag recovery time @@ -120,8 +120,8 @@ public void testBalancedWeightsFavorStabilityOverScaleUp() // This is intentional behavior: the algorithm is conservative about scale-up. CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3); - double costCurrent = costFunction.computeCost(metrics, 10, config); - double costScaleUp = costFunction.computeCost(metrics, 20, config); + double costCurrent = costFunction.computeCost(metrics, 10, config).totalCost(); + double costScaleUp = costFunction.computeCost(metrics, 20, config).totalCost(); // With balanced weights (0.3 lag, 0.7 idle), the idle cost increase from // scaling up dominates the lag recovery benefit @@ -154,8 +154,8 @@ public void testWeightsAffectCost() CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.1); - double costLag = costFunction.computeCost(metrics, 10, lagOnly); - double costIdle = costFunction.computeCost(metrics, 10, idleOnly); + double costLag = costFunction.computeCost(metrics, 10, lagOnly).totalCost(); + double costIdle = costFunction.computeCost(metrics, 10, idleOnly).totalCost(); Assert.assertNotEquals("Different weights should produce different costs", costLag, costIdle, 0.0001); Assert.assertTrue("Lag-only cost should be positive", costLag > 0.0); @@ -170,9 +170,9 @@ public void testNoProcessingRateFavorsCurrentTaskCount() int currentTaskCount = 10; CostMetrics metricsNoRate = createMetricsWithRate(50000.0, currentTaskCount, 100, 0.3, 0.0); - double costAtCurrent = costFunction.computeCost(metricsNoRate, currentTaskCount, config); - double costScaleUp = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, config); - double costScaleDown = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, config); + double costAtCurrent = costFunction.computeCost(metricsNoRate, currentTaskCount, config).totalCost(); + double costScaleUp = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, config).totalCost(); + double costScaleDown = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, config).totalCost(); Assert.assertTrue( "Cost at current should be less than cost for scale up", @@ -201,8 +201,8 @@ public void testNoProcessingRateDeviationPenaltyIsSymmetric() .defaultProcessingRate(1000.0) .build(); - double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, lagOnlyConfig); - double costDown5 = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, lagOnlyConfig); + double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, lagOnlyConfig).totalCost(); + double costDown5 = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, lagOnlyConfig).totalCost(); Assert.assertEquals( "Lag cost for +5 and -5 deviation should be equal", @@ -229,10 +229,10 @@ public void testIdleCostMonotonicWithTaskCount() // Current: 10 tasks with 40% idle (60% busy) CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4); - double costAt5 = costFunction.computeCost(metrics, 5, idleOnlyConfig); - double costAt10 = costFunction.computeCost(metrics, 10, idleOnlyConfig); - double costAt15 = costFunction.computeCost(metrics, 15, idleOnlyConfig); - double costAt20 = costFunction.computeCost(metrics, 20, idleOnlyConfig); + double costAt5 = costFunction.computeCost(metrics, 5, idleOnlyConfig).totalCost(); + double costAt10 = costFunction.computeCost(metrics, 10, idleOnlyConfig).totalCost(); + double costAt15 = costFunction.computeCost(metrics, 15, idleOnlyConfig).totalCost(); + double costAt20 = costFunction.computeCost(metrics, 20, idleOnlyConfig).totalCost(); // Monotonically increasing idle cost as tasks increase Assert.assertTrue("cost(5) < cost(10)", costAt5 < costAt10); @@ -256,7 +256,7 @@ public void testIdleRatioClampingAtBoundaries() // busyFraction = 0.6, taskRatio = 0.2 // predictedIdle = 1 - 0.6/0.2 = 1 - 3 = -2 → clamped to 0 CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4); - double costAt2 = costFunction.computeCost(metrics, 2, idleOnlyConfig); + double costAt2 = costFunction.computeCost(metrics, 2, idleOnlyConfig).totalCost(); // idlenessCost = taskCount * taskDuration * 0.0 (clamped) = 0 Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped to 0", 0.0, costAt2, 0.0001); @@ -266,7 +266,7 @@ public void testIdleRatioClampingAtBoundaries() // busyFraction = 0.9, taskRatio = 10 // predictedIdle = 1 - 0.9/10 = 1 - 0.09 = 0.91 (within bounds) CostMetrics lowIdle = createMetrics(0.0, 10, 100, 0.1); - double costAt100 = costFunction.computeCost(lowIdle, 100, idleOnlyConfig); + double costAt100 = costFunction.computeCost(lowIdle, 100, idleOnlyConfig).totalCost(); // idlenessCost = 100 * 3600 * 0.91 = 327600 Assert.assertTrue("Cost should be finite and positive", Double.isFinite(costAt100) && costAt100 > 0); } @@ -286,8 +286,8 @@ public void testIdleRatioWithMissingData() // Negative idle ratio indicates missing data → should default to 0.5 CostMetrics missingIdleData = createMetrics(0.0, 10, 100, -1.0); - double cost10 = costFunction.computeCost(missingIdleData, 10, idleOnlyConfig); - double cost20 = costFunction.computeCost(missingIdleData, 20, idleOnlyConfig); + double cost10 = costFunction.computeCost(missingIdleData, 10, idleOnlyConfig).totalCost(); + double cost20 = costFunction.computeCost(missingIdleData, 20, idleOnlyConfig).totalCost(); // With missing data, predicted idle = 0.5 for all task counts // idlenessCost at 10 = 10 * 3600 * 0.5 = 18000 diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 30a0d7a723e4..ce6c87e5e082 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.segment.incremental.ParseExceptionReport; import javax.annotation.Nullable; @@ -86,6 +87,11 @@ default Boolean isHealthy() return null; // default implementation for interface compatability; returning null since true or false is misleading } + default SupervisorTaskAutoScaler createAutoscaler(SupervisorSpec spec) + { + return spec.createAutoscaler(this); + } + /** * Resets any stored metadata by the supervisor. * @param dataSourceMetadata optional dataSource metadata. diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java index c921e2740b87..17cd347231ae 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java @@ -24,4 +24,15 @@ public interface SupervisorTaskAutoScaler void start(); void stop(); void reset(); + + /** + * Computes the optimal task count during task rollover, allowing a non-disruptive scale-down. + * Must be called by the supervisor when tasks are ending their duration. + * + * @return optimal task count for scale-down, or -1 if no change needed + */ + default int computeTaskCountForRollover() + { + return -1; + } }