Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
aae2554
init
GGraziadei May 3, 2026
2aad8a2
allocate suppliers once
GGraziadei May 3, 2026
5cb1936
improve jitter definition
GGraziadei May 5, 2026
5aa8264
add documentation
GGraziadei May 5, 2026
946715d
format
GGraziadei May 5, 2026
ac4e3b1
format
GGraziadei May 5, 2026
23247e3
fix checkstyle
GGraziadei May 5, 2026
ce8e2b1
fix RFC 1889 jitter definition, guarantee jitter decay if stable latency
GGraziadei May 6, 2026
f040770
minor changes
GGraziadei May 8, 2026
b29b8e3
Merge branch '8538-rfc-1889a-jitter-metric' into 8538-jitter-internal…
GGraziadei May 9, 2026
cb69a04
publish control signal based on ewma metrics
GGraziadei May 9, 2026
f5e28f4
fix typo on `ZeroOneOpenIntervalValidator`
GGraziadei May 9, 2026
451eafa
consume metrics and update internal status
GGraziadei May 9, 2026
e7d94f8
implement a cache for child tasks stats
GGraziadei May 9, 2026
35abc68
Merge branch 'master' into 8538-jitter-control-loop
GGraziadei May 20, 2026
215e3e0
feedback message serialization and path decoupling
GGraziadei May 21, 2026
150e210
Merge remote-tracking branch 'upstream/master' into 8538-jitter-contr…
GGraziadei May 24, 2026
99f53e4
feedback refactoring + JitterAwareStreamGrouping
GGraziadei May 24, 2026
7d34c90
control loop lazy update
GGraziadei May 30, 2026
a76ee8d
send feedback according to a periodic tick tuple
GGraziadei May 31, 2026
a95f5f1
Merge branch 'master' into 8538-jitter-control-loop
GGraziadei May 31, 2026
c938196
minor changes
GGraziadei Jun 6, 2026
d8e7cda
add jitter aware grouping topology bench
GGraziadei Jun 7, 2026
2b4d8ad
remove not necessary UpstreamFeedbackCompareTopo.java
GGraziadei Jun 7, 2026
babd3d9
remove unnecessary import, FT for registering FEEDBACK_TICK_STREAM
GGraziadei Jun 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.stats.ewma.enable: false
topology.stats.ewma.smoothing.factor: 0.0625
topology.upstream.feedback.freq.secs: 10
topology.upstream.feedback.stream: "__feedback"
topology.upstream.feedback.enable: false
topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: false
topology.worker.childopts: null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/

package org.apache.storm.perf;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.JitterAwareStreamGrouping;
import org.apache.storm.perf.spout.FileReadSpout;
import org.apache.storm.perf.utils.Helper;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

/**
* Benchmark for {@link JitterAwareStreamGrouping} in a word-count pipeline where worker tasks have
* artificially skewed processing latencies.
*
* <p>Pipeline: {@code GenSpout -> SplitterBolt -> JitteryWorkerBolt -> SinkBolt}
*
* <p>{@code JitteryWorkerBolt} tasks have task-index-dependent processing delays. Task 0 is fast;
* each subsequent task is progressively slower. This mimics real-world conditions (GC pressure,
* I/O, resource contention) where downstream tasks diverge in responsiveness. With upstream
* feedback enabled, {@link JitterAwareStreamGrouping} routes more tuples to the fastest tasks,
* improving throughput and reducing complete latency by &ge;10% compared to plain round-robin.
*
* <p>Run the baseline and the jitter-aware run back-to-back to compare:
* <pre>
* # Baseline: JitterAwareStreamGrouping falls back to round-robin (no feedback stats).
* storm jar storm-perf.jar org.apache.storm.perf.JitterAwareGroupingTopology 120 \
* -c topology.upstream.feedback.enable=false
*
* # Jitter-aware: grouping steers tuples to lowest-jitter workers.
* storm jar storm-perf.jar org.apache.storm.perf.JitterAwareGroupingTopology 120 \
* -c topology.upstream.feedback.enable=true
* </pre>
*
* <p>Tuning knobs (pass with {@code -c key=value}):
* <ul>
* <li>{@code spout.count} — number of spout tasks (default 1)</li>
* <li>{@code splitter.count} — number of splitter tasks (default 2)</li>
* <li>{@code worker.count} — number of jittery worker tasks (default 4)</li>
* <li>{@code sink.count} — number of sink tasks (default 1)</li>
* <li>{@code input.file} — path to a text file whose lines are treated as sentences
* (e.g. {@code src/main/sampledata/randomwords.txt})</li>
* <li>{@code worker.base.delay.us} — per-index processing delay step in µs (default 2000).
* Task {@code i} parks for {@code i * base} µs plus up to {@code base} µs of random noise,
* so a 4-worker setup has delays of ~0-2, ~2-4, ~4-6, ~6-8 ms. Must be &ge;1000 µs so
* the EWMA latency gauge (millisecond resolution) sees distinct values per task.</li>
* </ul>
*/
public class JitterAwareGroupingTopology {

public static final String TOPOLOGY_NAME = "JitterAwareGroupingTopology";

static final String SPOUT_ID = "gen";
static final String SPLITTER_ID = "splitter";
static final String WORKER_ID = "worker";
static final String SINK_ID = "sink";

static final String SPOUT_NUM = "spout.count";
static final String SPLITTER_NUM = "splitter.count";
static final String WORKER_NUM = "worker.count";
static final String SINK_NUM = "sink.count";
static final String INPUT_FILE = "input.file";
static final String WORKER_BASE_DELAY_US = "worker.base.delay.us";

private static final String FIELD_SENTENCE = "sentence";
private static final String FIELD_WORD = "word";
private static final String FIELD_COUNT = "count";

static StormTopology getTopology(Map<String, Object> conf) {
int spouts = Helper.getInt(conf, SPOUT_NUM, 1);
int splitters = Helper.getInt(conf, SPLITTER_NUM, 2);
int workers = Helper.getInt(conf, WORKER_NUM, 4);
int sinks = Helper.getInt(conf, SINK_NUM, 1);
long baseDelayUs = Helper.getInt(conf, WORKER_BASE_DELAY_US, 2000);
String inputFile = Helper.getStr(conf, INPUT_FILE);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, new GenSpout(inputFile), spouts);
builder.setBolt(SPLITTER_ID, new SplitterBolt(), splitters)
.localOrShuffleGrouping(SPOUT_ID);
builder.setBolt(WORKER_ID, new JitteryWorkerBolt(baseDelayUs), workers)
.customGrouping(SPLITTER_ID, new JitterAwareStreamGrouping());
builder.setBolt(SINK_ID, new SinkBolt(), sinks)
.localOrShuffleGrouping(WORKER_ID);

return builder.createTopology();
}

public static void main(String[] args) throws Exception {
int runTime = -1;
Config topoConf = new Config();
if (args.length > 0) {
runTime = Integer.parseInt(args[0]);
}
if (args.length > 1) {
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
}
if (args.length > 2) {
System.err.println("args: [runDurationSec] [optionalConfFile]");
return;
}

topoConf.put(Config.TOPOLOGY_STATS_EWMA_ENABLE, true);
topoConf.putIfAbsent(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4000);
topoConf.putIfAbsent(Config.TOPOLOGY_UPSTREAM_FEEDBACK_FREQ_SECS, 10);
topoConf.putAll(Utils.readCommandLineOpts());

Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}

/**
* Emits anchored sentences loaded from {@code input.file} at maximum rate. The file is read
* once into memory during {@link #open} and replayed in a round-robin loop. Anchoring (with a
* msgId) ensures Storm tracks each tuple tree to completion, so spout complete-latency is a
* reliable end-to-end signal.
*/
private static class GenSpout extends BaseRichSpout {
private final String filePath;
private SpoutOutputCollector collector;
private List<String> lines;
private int lineIdx;
private long msgId;

GenSpout(String filePath) {
this.filePath = filePath;
}

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
this.lines = FileReadSpout.readLines(new FileInputStream(filePath));
} catch (IOException e) {
throw new RuntimeException("Cannot open input file: " + filePath, e);
}
if (lines.isEmpty()) {
throw new RuntimeException("Input file is empty: " + filePath);
}
}

@Override
public void nextTuple() {
String sentence = lines.get(lineIdx++ % lines.size());
collector.emit(new Values(sentence), ++msgId);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELD_SENTENCE));
}
}

/**
* Splits each incoming sentence into words and emits one tuple per word, anchored so the ack
* tree extends to the downstream worker.
*/
private static class SplitterBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for (String word : sentence.split("\\s+")) {
collector.emit(tuple, new Values(word));
}
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELD_WORD));
}
}

/**
* Counts words and adds a task-index-proportional processing delay to produce deliberately
* skewed jitter profiles across tasks.
*
* <p>Task {@code i} parks for {@code i * baseDelayUs} µs plus up to {@code baseDelayUs} µs of
* random noise per tuple. For a 4-task setup with the default {@code baseDelayUs = 2000}:
* <ul>
* <li>Task 0: ~0–2 ms (fast)</li>
* <li>Task 1: ~2–4 ms</li>
* <li>Task 2: ~4–6 ms</li>
* <li>Task 3: ~6–8 ms (slow)</li>
* </ul>
* Delays are millisecond-scale so the EWMA execute-latency gauge (which stores values in ms)
* records distinct values per task. {@link JitterAwareStreamGrouping} then steers tuples toward
* the task with the lowest EWMA execute-latency when feedback is enabled.
*
* <p>{@link LockSupport#parkNanos} is used instead of a spin loop so slow tasks yield the CPU
* and do not starve the fast task's executor thread.
*/
private static class JitteryWorkerBolt extends BaseRichBolt {
private final long baseDelayUs;
private OutputCollector collector;
private long taskBaselineNs;
private long baseDelayNs;
private final Map<String, Integer> counts = new HashMap<>();

JitteryWorkerBolt(long baseDelayUs) {
this.baseDelayUs = baseDelayUs;
}

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.baseDelayNs = baseDelayUs * 1_000L;
// Higher task indices get proportionally larger baseline delays.
this.taskBaselineNs = baseDelayNs * context.getThisTaskIndex();
}

@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
counts.merge(word, 1, Integer::sum);
int count = counts.get(word);

long noiseNs = (long) (ThreadLocalRandom.current().nextDouble() * baseDelayNs);
long sleepNs = taskBaselineNs + noiseNs;
if (sleepNs > 0) {
LockSupport.parkNanos(sleepNs);
}

// Emit anchored so the ack chain continues to SinkBolt, and so execute/process jitter
// is measured and reported back to SplitterBolt via upstream feedback.
collector.emit(tuple, new Values(word, count));
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELD_WORD, FIELD_COUNT));
}
}

/**
* Terminal bolt: acks each tuple to complete the tuple tree and drive spout complete-latency.
*/
private static class SinkBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// terminal — no output
}
}
}
40 changes: 38 additions & 2 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,45 @@ public class Config extends HashMap<String, Object> {
*
* @see <a href="https://www.rfc-editor.org/rfc/rfc1889#appendix-A.8">RFC 1889 §A.8</a>
*/
@CustomValidator(validatorClass = ConfigValidation.EwmaSmoothingFactorValidator.class)
@CustomValidator(validatorClass = ConfigValidation.ZeroOneOpenIntervalValidator.class)
public static final String TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR = "topology.stats.ewma.smoothing.factor";
/**
* Flag to enable or disable the feedback channel for upstream communication.
* When true, components can send unanchored tuples back to their source tasks.
*/
@IsBoolean
public static final String TOPOLOGY_UPSTREAM_FEEDBACK_ENABLE = "topology.upstream.feedback.enable";
/**
* The specific stream ID used for upstream feedback communication.
* Defaults to "__feedback" if not explicitly configured.
*/
@IsString
public static final String TOPOLOGY_UPSTREAM_FEEDBACK_STREAM_ID = "topology.upstream.feedback.stream";
/**
* The period, in seconds, between upstream feedback messages within the topology.
*
* <p>A dedicated internal feedback tick fires on this interval; on each tick a task emits
* a feedback tuple (containing metrics such as EWMA jitter stats) back to its parent tasks.
* This mechanism allows parent tasks to receive performance signals from downstream
* components to facilitate adaptive flow control or load balancing. Unlike a probabilistic
* trigger, the period yields a deterministic, data-volume-independent feedback cadence.</p>
*
* <p><b>Validation:</b> Must be a positive integer (seconds).</p>
*
* <p><b>Impact:</b>
* <ul>
* <li>Lower values provide more precise, real-time performance data but increase
* network overhead and CPU usage on the control plane.</li>
* <li>Higher values minimize the "observer effect" on the topology's throughput
* while still providing periodic statistical snapshots of health.</li>
* </ul>
* </p>
*
* Defaults to 10 if not explicitly configured.
*/
@IsInteger
@IsPositiveNumber
public static final String TOPOLOGY_UPSTREAM_FEEDBACK_FREQ_SECS = "topology.upstream.feedback.freq.secs";
/**
* The time period that builtin metrics data in bucketed into.
*/
Expand Down Expand Up @@ -1901,7 +1938,6 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_MESSAGING_NETTY_TLS_SSL_PROTOCOLS = "storm.messaging.netty.tls.ssl.protocols";

/**
* /**
* Netty based messaging: The number of milliseconds that a Netty client will retry flushing messages that are already
* buffered to be sent.
*/
Expand Down
1 change: 1 addition & 0 deletions storm-client/src/jvm/org/apache/storm/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class Constants {
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics_";
public static final String METRICS_STREAM_ID = "__metrics";
public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
public static final String FEEDBACK_TICK_STREAM_ID = "__feedback_tick";

public static final Object TOPOLOGY = "topology";
public static final String SYSTEM_TOPOLOGY = "system-topology";
Expand Down
Loading