diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index d5d6bb45139..78ec52cf7b1 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -286,6 +286,9 @@ topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.stats.ewma.enable: false
topology.stats.ewma.smoothing.factor: 0.0625
+topology.upstream.feedback.freq.secs: 10
+topology.upstream.feedback.stream: "__feedback"
+topology.upstream.feedback.enable: false
topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: false
topology.worker.childopts: null
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JitterAwareGroupingTopology.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/JitterAwareGroupingTopology.java
new file mode 100644
index 00000000000..935a82351ea
--- /dev/null
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/JitterAwareGroupingTopology.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.perf;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.JitterAwareStreamGrouping;
+import org.apache.storm.perf.spout.FileReadSpout;
+import org.apache.storm.perf.utils.Helper;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Benchmark for {@link JitterAwareStreamGrouping} in a word-count pipeline where worker tasks have
+ * artificially skewed processing latencies.
+ *
+ *
{@code JitteryWorkerBolt} tasks have task-index-dependent processing delays. Task 0 is fast;
+ * each subsequent task is progressively slower. This mimics real-world conditions (GC pressure,
+ * I/O, resource contention) where downstream tasks diverge in responsiveness. With upstream
+ * feedback enabled, {@link JitterAwareStreamGrouping} routes more tuples to the fastest tasks,
+ * improving throughput and reducing complete latency by ≥10% compared to plain round-robin.
+ *
+ *
Run the baseline and the jitter-aware run back-to-back to compare:
+ *
+ * # Baseline: JitterAwareStreamGrouping falls back to round-robin (no feedback stats).
+ * storm jar storm-perf.jar org.apache.storm.perf.JitterAwareGroupingTopology 120 \
+ * -c topology.upstream.feedback.enable=false
+ *
+ * # Jitter-aware: grouping steers tuples to lowest-jitter workers.
+ * storm jar storm-perf.jar org.apache.storm.perf.JitterAwareGroupingTopology 120 \
+ * -c topology.upstream.feedback.enable=true
+ *
+ *
+ *
Tuning knobs (pass with {@code -c key=value}):
+ *
+ *
{@code spout.count} — number of spout tasks (default 1)
+ *
{@code splitter.count} — number of splitter tasks (default 2)
+ *
{@code worker.count} — number of jittery worker tasks (default 4)
+ *
{@code sink.count} — number of sink tasks (default 1)
+ *
{@code input.file} — path to a text file whose lines are treated as sentences
+ * (e.g. {@code src/main/sampledata/randomwords.txt})
+ *
{@code worker.base.delay.us} — per-index processing delay step in µs (default 2000).
+ * Task {@code i} parks for {@code i * base} µs plus up to {@code base} µs of random noise,
+ * so a 4-worker setup has delays of ~0-2, ~2-4, ~4-6, ~6-8 ms. Must be ≥1000 µs so
+ * the EWMA latency gauge (millisecond resolution) sees distinct values per task.
+ *
+ */
+public class JitterAwareGroupingTopology {
+
+ public static final String TOPOLOGY_NAME = "JitterAwareGroupingTopology";
+
+ static final String SPOUT_ID = "gen";
+ static final String SPLITTER_ID = "splitter";
+ static final String WORKER_ID = "worker";
+ static final String SINK_ID = "sink";
+
+ static final String SPOUT_NUM = "spout.count";
+ static final String SPLITTER_NUM = "splitter.count";
+ static final String WORKER_NUM = "worker.count";
+ static final String SINK_NUM = "sink.count";
+ static final String INPUT_FILE = "input.file";
+ static final String WORKER_BASE_DELAY_US = "worker.base.delay.us";
+
+ private static final String FIELD_SENTENCE = "sentence";
+ private static final String FIELD_WORD = "word";
+ private static final String FIELD_COUNT = "count";
+
+ static StormTopology getTopology(Map conf) {
+ int spouts = Helper.getInt(conf, SPOUT_NUM, 1);
+ int splitters = Helper.getInt(conf, SPLITTER_NUM, 2);
+ int workers = Helper.getInt(conf, WORKER_NUM, 4);
+ int sinks = Helper.getInt(conf, SINK_NUM, 1);
+ long baseDelayUs = Helper.getInt(conf, WORKER_BASE_DELAY_US, 2000);
+ String inputFile = Helper.getStr(conf, INPUT_FILE);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(SPOUT_ID, new GenSpout(inputFile), spouts);
+ builder.setBolt(SPLITTER_ID, new SplitterBolt(), splitters)
+ .localOrShuffleGrouping(SPOUT_ID);
+ builder.setBolt(WORKER_ID, new JitteryWorkerBolt(baseDelayUs), workers)
+ .customGrouping(SPLITTER_ID, new JitterAwareStreamGrouping());
+ builder.setBolt(SINK_ID, new SinkBolt(), sinks)
+ .localOrShuffleGrouping(WORKER_ID);
+
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int runTime = -1;
+ Config topoConf = new Config();
+ if (args.length > 0) {
+ runTime = Integer.parseInt(args[0]);
+ }
+ if (args.length > 1) {
+ topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+ }
+ if (args.length > 2) {
+ System.err.println("args: [runDurationSec] [optionalConfFile]");
+ return;
+ }
+
+ topoConf.put(Config.TOPOLOGY_STATS_EWMA_ENABLE, true);
+ topoConf.putIfAbsent(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4000);
+ topoConf.putIfAbsent(Config.TOPOLOGY_UPSTREAM_FEEDBACK_FREQ_SECS, 10);
+ topoConf.putAll(Utils.readCommandLineOpts());
+
+ Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ }
+
+ /**
+ * Emits anchored sentences loaded from {@code input.file} at maximum rate. The file is read
+ * once into memory during {@link #open} and replayed in a round-robin loop. Anchoring (with a
+ * msgId) ensures Storm tracks each tuple tree to completion, so spout complete-latency is a
+ * reliable end-to-end signal.
+ */
+ private static class GenSpout extends BaseRichSpout {
+ private final String filePath;
+ private SpoutOutputCollector collector;
+ private List lines;
+ private int lineIdx;
+ private long msgId;
+
+ GenSpout(String filePath) {
+ this.filePath = filePath;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ try {
+ this.lines = FileReadSpout.readLines(new FileInputStream(filePath));
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot open input file: " + filePath, e);
+ }
+ if (lines.isEmpty()) {
+ throw new RuntimeException("Input file is empty: " + filePath);
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ String sentence = lines.get(lineIdx++ % lines.size());
+ collector.emit(new Values(sentence), ++msgId);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(FIELD_SENTENCE));
+ }
+ }
+
+ /**
+ * Splits each incoming sentence into words and emits one tuple per word, anchored so the ack
+ * tree extends to the downstream worker.
+ */
+ private static class SplitterBolt extends BaseRichBolt {
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split("\\s+")) {
+ collector.emit(tuple, new Values(word));
+ }
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(FIELD_WORD));
+ }
+ }
+
+ /**
+ * Counts words and adds a task-index-proportional processing delay to produce deliberately
+ * skewed jitter profiles across tasks.
+ *
+ *
Task {@code i} parks for {@code i * baseDelayUs} µs plus up to {@code baseDelayUs} µs of
+ * random noise per tuple. For a 4-task setup with the default {@code baseDelayUs = 2000}:
+ *
+ *
Task 0: ~0–2 ms (fast)
+ *
Task 1: ~2–4 ms
+ *
Task 2: ~4–6 ms
+ *
Task 3: ~6–8 ms (slow)
+ *
+ * Delays are millisecond-scale so the EWMA execute-latency gauge (which stores values in ms)
+ * records distinct values per task. {@link JitterAwareStreamGrouping} then steers tuples toward
+ * the task with the lowest EWMA execute-latency when feedback is enabled.
+ *
+ *
{@link LockSupport#parkNanos} is used instead of a spin loop so slow tasks yield the CPU
+ * and do not starve the fast task's executor thread.
+ */
+ private static class JitteryWorkerBolt extends BaseRichBolt {
+ private final long baseDelayUs;
+ private OutputCollector collector;
+ private long taskBaselineNs;
+ private long baseDelayNs;
+ private final Map counts = new HashMap<>();
+
+ JitteryWorkerBolt(long baseDelayUs) {
+ this.baseDelayUs = baseDelayUs;
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.baseDelayNs = baseDelayUs * 1_000L;
+ // Higher task indices get proportionally larger baseline delays.
+ this.taskBaselineNs = baseDelayNs * context.getThisTaskIndex();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ String word = tuple.getString(0);
+ counts.merge(word, 1, Integer::sum);
+ int count = counts.get(word);
+
+ long noiseNs = (long) (ThreadLocalRandom.current().nextDouble() * baseDelayNs);
+ long sleepNs = taskBaselineNs + noiseNs;
+ if (sleepNs > 0) {
+ LockSupport.parkNanos(sleepNs);
+ }
+
+ // Emit anchored so the ack chain continues to SinkBolt, and so execute/process jitter
+ // is measured and reported back to SplitterBolt via upstream feedback.
+ collector.emit(tuple, new Values(word, count));
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(FIELD_WORD, FIELD_COUNT));
+ }
+ }
+
+ /**
+ * Terminal bolt: acks each tuple to complete the tuple tree and drive spout complete-latency.
+ */
+ private static class SinkBolt extends BaseRichBolt {
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // terminal — no output
+ }
+ }
+}
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 8d3fc51ccdd..d25e6a16ad3 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -608,8 +608,45 @@ public class Config extends HashMap {
*
* @see RFC 1889 §A.8
*/
- @CustomValidator(validatorClass = ConfigValidation.EwmaSmoothingFactorValidator.class)
+ @CustomValidator(validatorClass = ConfigValidation.ZeroOneOpenIntervalValidator.class)
public static final String TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR = "topology.stats.ewma.smoothing.factor";
+ /**
+ * Flag to enable or disable the feedback channel for upstream communication.
+ * When true, components can send unanchored tuples back to their source tasks.
+ */
+ @IsBoolean
+ public static final String TOPOLOGY_UPSTREAM_FEEDBACK_ENABLE = "topology.upstream.feedback.enable";
+ /**
+ * The specific stream ID used for upstream feedback communication.
+ * Defaults to "__feedback" if not explicitly configured.
+ */
+ @IsString
+ public static final String TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID = "topology.upstream.feedback.stream";
+ /**
+ * The period, in seconds, between upstream feedback messages within the topology.
+ *
+ *
A dedicated internal feedback tick fires on this interval; on each tick a task emits
+ * a feedback tuple (containing metrics such as EWMA jitter stats) back to its parent tasks.
+ * This mechanism allows parent tasks to receive performance signals from downstream
+ * components to facilitate adaptive flow control or load balancing. Unlike a probabilistic
+ * trigger, the period yields a deterministic, data-volume-independent feedback cadence.
+ *
+ *
Validation: Must be a positive integer (seconds).
+ *
+ *
Impact:
+ *
+ *
Lower values provide more precise, real-time performance data but increase
+ * network overhead and CPU usage on the control plane.
+ *
Higher values minimize the "observer effect" on the topology's throughput
+ * while still providing periodic statistical snapshots of health.
+ *
+ *
+ *
+ * Defaults to 10 if not explicitly configured.
+ */
+ @IsInteger
+ @IsPositiveNumber
+ public static final String TOPOLOGY_UPSTREAM_FEEDBACK_FREQ_SECS = "topology.upstream.feedback.freq.secs";
/**
* The time period that builtin metrics data in bucketed into.
*/
@@ -1901,7 +1938,6 @@ public class Config extends HashMap {
public static final String STORM_MESSAGING_NETTY_TLS_SSL_PROTOCOLS = "storm.messaging.netty.tls.ssl.protocols";
/**
- * /**
* Netty based messaging: The number of milliseconds that a Netty client will retry flushing messages that are already
* buffered to be sent.
*/
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index c3cd0808709..c7790e56fa3 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -28,6 +28,7 @@ public class Constants {
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics_";
public static final String METRICS_STREAM_ID = "__metrics";
public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
+ public static final String FEEDBACK_TICK_STREAM_ID = "__feedback_tick";
public static final Object TOPOLOGY = "topology";
public static final String SYSTEM_TOPOLOGY = "system-topology";
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 0f8359783c2..ac558c0c9b3 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -20,6 +20,7 @@
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.Thrift;
+import org.apache.storm.executor.ChildEwmaStats;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.grouping.CustomStreamGrouping;
@@ -43,6 +44,11 @@ public void refreshLoad(LoadMapping loadMapping) {
}
+ @Override
+ public void registerEwmaStats(ChildEwmaStats childEwmaStats) {
+
+ }
+
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
@@ -136,6 +142,11 @@ public void refreshLoad(LoadMapping loadMapping) {
}
+ @Override
+ public void registerEwmaStats(ChildEwmaStats childEwmaStats) {
+
+ }
+
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
customStreamGrouping.prepare(context, stream, targetTasks);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index b3cfd90d4d6..b144e39abaf 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -360,6 +360,21 @@ public static void addEventLogger(Map conf, StormTopology topolo
topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
}
+ public static void addUpstreamFeedback(Map conf, StormTopology topology) {
+ // Only invoked when hasUpstreamFeedback(conf) is true, so declare the feedback stream on every
+ // component unconditionally. The schema must match the tuple emitted by
+ // Executor.buildUpstreamFeedbackTuple: [TaskInfo, EwmaFeedbackRecord].
+ String feedbackStreamId = ConfigUtils.upstreamFeedbackStreamId(conf);
+ for (Object component : allComponents(topology).values()) {
+ ComponentCommon common = getComponentCommon(component);
+ common.put_to_streams(feedbackStreamId, Thrift.outputFields(upstreamFeedbackFields()));
+ }
+ }
+
+ public static List upstreamFeedbackFields() {
+ return Arrays.asList("task-info", "feedback");
+ }
+
@SuppressWarnings("unchecked")
public static Map metricsConsumerBoltSpecs(Map conf, StormTopology topology) {
Map metricsConsumerBolts = new HashMap<>();
@@ -429,6 +444,9 @@ public static void addSystemComponents(Map conf, StormTopology t
outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
outputStreams.put(Constants.SYSTEM_FLUSH_STREAM_ID, Thrift.outputFields(Arrays.asList()));
outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
+ if (ConfigUtils.upstreamFeedbackEnable(conf)) {
+ outputStreams.put(Constants.FEEDBACK_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
+ }
Map boltConf = new HashMap<>();
boltConf.put(Config.TOPOLOGY_TASKS, 0);
@@ -464,6 +482,10 @@ public static boolean hasEventLoggers(Map topoConf) {
return eventLoggerNum == null || ObjectReader.getInt(eventLoggerNum) > 0;
}
+ public static boolean hasUpstreamFeedback(Map topoConf) {
+ return ConfigUtils.upstreamFeedbackEnable(topoConf);
+ }
+
public static int numStartExecutors(Object component) throws InvalidTopologyException {
ComponentCommon common = getComponentCommon(component);
return Thrift.getParallelismHint(common);
@@ -538,6 +560,9 @@ protected StormTopology systemTopologyImpl(Map topoConf, StormTo
if (hasEventLoggers(topoConf)) {
addEventLogger(topoConf, ret);
}
+ if (hasUpstreamFeedback(topoConf)) {
+ addUpstreamFeedback(topoConf, ret);
+ }
addMetricComponents(topoConf, ret);
addSystemComponents(topoConf, ret);
addMetricStreams(ret);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 45a1f2d7e3a..8020445229b 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -216,6 +216,27 @@ public void sendUnanchored(String stream, List