From 623288fd56926825a881d90c0a6c49c55a37d220 Mon Sep 17 00:00:00 2001 From: Gianluca Graziadei Date: Tue, 2 Jun 2026 22:19:55 +0200 Subject: [PATCH 1/5] init --- docs/Serialization.md | 21 +++++- flux/README.md | 39 +++++++++++ .../org/apache/storm/flux/FluxBuilder.java | 12 ++++ .../apache/storm/flux/model/VertexDef.java | 13 ++++ .../java/org/apache/storm/flux/TCKTest.java | 67 +++++++++++++++++++ .../component-config-invalid-key-test.yaml | 40 +++++++++++ .../component-config-invalid-test.yaml | 40 +++++++++++ .../component-config-missing-test.yaml | 39 +++++++++++ .../configs/component-config-test.yaml | 48 +++++++++++++ 9 files changed, 316 insertions(+), 3 deletions(-) create mode 100644 flux/flux-core/src/test/resources/configs/component-config-invalid-key-test.yaml create mode 100644 flux/flux-core/src/test/resources/configs/component-config-invalid-test.yaml create mode 100644 flux/flux-core/src/test/resources/configs/component-config-missing-test.yaml create mode 100644 flux/flux-core/src/test/resources/configs/component-config-test.yaml diff --git a/docs/Serialization.md b/docs/Serialization.md index e7af57477ab..8f87ba6b043 100644 --- a/docs/Serialization.md +++ b/docs/Serialization.md @@ -90,9 +90,24 @@ You can also enable it topology-wide (or cluster-wide via `storm.yaml`) by setti #### Flux -> **Note:** With [Flux](flux.html), only **topology-wide** enablement is currently possible. Flux has no per-component configuration mechanism — `FluxBuilder` applies only parallelism, number of tasks, memory/CPU load, and groupings to the underlying declarers, and the `config:` block is topology-scoped. There is no Flux equivalent of `declarer.addConfiguration(...)`, so the per-component approach recommended above cannot be expressed in a Flux YAML definition. +[Flux](flux.html) supports per-component configuration. In addition to parallelism, number of tasks, memory/CPU load, and groupings, each spout and bolt definition accepts a `config:` block that `FluxBuilder` applies to the underlying declarer via `addConfigurations(...)`. This is the Flux equivalent of `declarer.addConfiguration(...)`, so you can enable compression for just the components that emit large tuples: -To enable compression for a Flux topology, set it in the topology-level `config:` block: +```yaml +spouts: + - id: "file-read-spout" + className: "org.apache.storm.perf.spout.FileReadSpout" + parallelism: 1 + # enable compression for this spout only + config: + topology.tuple.compression.enable: true + +bolts: + - id: "split" + className: "org.apache.storm.perf.bolt.SplitSentenceBolt" + parallelism: 1 +``` + +You can also enable it topology-wide by setting it in the topology-level `config:` ```yaml config: @@ -100,7 +115,7 @@ config: topology.tuple.compression.threshold: 1460 ``` -Be aware that this enables compression for *every* remote-bound tuple in the topology that exceeds the threshold. +Be aware that the topology-wide form enables compression for *every* remote-bound tuple in the topology that exceeds the threshold. #### Configuration reference diff --git a/flux/README.md b/flux/README.md index 83192563306..360e3bc654a 100644 --- a/flux/README.md +++ b/flux/README.md @@ -574,6 +574,17 @@ component when the topology is deployed. Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as well. +In addition to `parallelism`, spout and bolt definitions support the following optional parameters that map directly to +the underlying Storm `BoltDeclarer`/`SpoutDeclarer`: + +| Parameter | Description | +|---------------------|------------------------------------------------------------------------------------------| +| `numTasks` | The number of tasks for the component (`setNumTasks`). | +| `onHeapMemoryLoad` | The on-heap memory load, in MB, for resource-aware scheduling (`setMemoryLoad`). | +| `offHeapMemoryLoad` | The off-heap memory load, in MB, for resource-aware scheduling (`setMemoryLoad`). | +| `cpuLoad` | The CPU load for resource-aware scheduling (`setCPULoad`). | +| `config` | A map of configuration parameters applied only to this component (`addConfigurations`). | + Shell spout example: ```yaml @@ -656,6 +667,34 @@ bolts: parallelism: 1 # ... ``` + +### Per-Component Configuration +In addition to the topology-wide [Topology Config](#topology-config), each spout and bolt can declare its own `config` +map. These configurations are applied to that component only, via the declarer's `addConfigurations(...)` method, +following Storm's native support for component-level configuration. This avoids enabling a configuration topology-wide +when only a single component requires it. + +```yaml +spouts: + - id: "sentence-spout" + className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout" + parallelism: 1 + # configuration applied to this spout only + config: + topology.max.spout.pending: 1000 + +bolts: + - id: "log" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + # configuration applied to this bolt only + config: + topology.tuple.compression.enable: true +``` + +Known Storm configuration keys are validated when the topology is built, so an invalid value (for example, a +non-boolean value for `topology.tuple.compression.enable`) fails fast rather than at submission time. + ## Streams and Stream Groupings Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in a topology, with an associated Grouping definition. diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java index 338994baa0d..a81d60606f0 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java @@ -48,6 +48,7 @@ import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.hooks.IWorkerHook; import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.ComponentConfigurationDeclarer; import org.apache.storm.topology.IBasicBolt; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.IRichSpout; @@ -57,6 +58,7 @@ import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; +import org.apache.storm.validation.ConfigValidation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -235,6 +237,7 @@ private static void buildStreamDefinitions(ExecutionContext context, TopologyBui if (boltDef.getNumTasks() > -1) { declarer.setNumTasks(boltDef.getNumTasks()); } + applyComponentConfig(boltDef.getConfig(), declarer); GroupingDef grouping = stream.getGrouping(); // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream @@ -456,6 +459,7 @@ private static void buildSpouts(ExecutionContext context, TopologyBuilder builde if (sd.getNumTasks() > -1) { declarer.setNumTasks(sd.getNumTasks()); } + applyComponentConfig(sd.getConfig(), declarer); context.addSpout(sd.getId(), spout); } @@ -470,6 +474,14 @@ private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) thr return (IRichSpout) buildObject(def, context); } + private static void applyComponentConfig(Map config, ComponentConfigurationDeclarer declarer) { + if (config == null || config.isEmpty()) { + return; + } + ConfigValidation.validateFields(config); + declarer.addConfigurations(config); + } + /** * Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key. * Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible. diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java index cd09d052abb..c31a3953dbb 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java @@ -18,6 +18,9 @@ package org.apache.storm.flux.model; +import java.util.HashMap; +import java.util.Map; + /** * Abstract parent class of component definitions. * (spouts/bolts) @@ -30,6 +33,8 @@ public abstract class VertexDef extends BeanDef { private int onHeapMemoryLoad = -1; private int offHeapMemoryLoad = -1; private int cpuLoad = -1; + // per-component configuration + private Map config = new HashMap(); public int getParallelism() { return parallelism; @@ -70,4 +75,12 @@ public int getCpuLoad() { public void setCpuLoad(int cpuLoad) { this.cpuLoad = cpuLoad; } + + public Map getConfig() { + return config; + } + + public void setConfig(Map config) { + this.config = config; + } } diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index 2fd755e5ec6..cfe6c9bf49a 100644 --- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java +++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@ -23,11 +23,13 @@ import org.apache.storm.flux.model.TopologyDef; import org.apache.storm.flux.parser.FluxParser; import org.apache.storm.flux.test.TestBolt; +import org.apache.storm.shade.net.minidev.json.JSONValue; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import java.util.Collections; +import java.util.Map; import java.util.Properties; public class TCKTest { @@ -103,6 +105,71 @@ public void testDiamondTopology() throws Exception { topology.validate(); } + @Test + public void testComponentConfig() throws Exception { + TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-test.yaml", + false, true, null, false); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + StormTopology topology = FluxBuilder.buildTopology(context); + assertNotNull(topology); + topology.validate(); + + Map spoutConf = (Map) JSONValue.parse( + topology.get_spouts().get("spout-1").get_common().get_json_conf()); + assertEquals(Boolean.TRUE, spoutConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE)); + + Map boltConf = (Map) JSONValue.parse( + topology.get_bolts().get("bolt-1").get_common().get_json_conf()); + assertEquals(Boolean.FALSE, boltConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE)); + } + + @Test + public void testComponentConfigWithInvalidValue() throws Exception { + TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-invalid-test.yaml", false, + true, null, false); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + + IllegalArgumentException expectedException = assertThrows(IllegalArgumentException.class, + () -> FluxBuilder.buildTopology(context)); + assertTrue(expectedException.getMessage().contains("must be of type")); + assertTrue(expectedException.getMessage().contains("Boolean")); + } + + @Test + public void testComponentConfigWithNotRegisteredKey() throws Exception { + TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-invalid-key-test.yaml", false, + true, null, false); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + + StormTopology topology = FluxBuilder.buildTopology(context); + Map boltConf = (Map) JSONValue.parse( + topology.get_bolts().get("bolt-1").get_common().get_json_conf()); + // properties added for custom purposes and persisted in the component conf + assertTrue(boltConf.containsKey("MY.INAVLID.KEY")); + } + + @Test + public void testComponentConfigMissing() throws Exception { + TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-missing-test.yaml", false, + true, null, false); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + StormTopology topology = FluxBuilder.buildTopology(context); + assertNotNull(topology); + topology.validate(); + + Map spoutConf = (Map) JSONValue.parse( + topology.get_spouts().get("spout-1").get_common().get_json_conf()); + assertTrue(spoutConf == null || !spoutConf.containsKey(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE)); + + Map boltConf = (Map) JSONValue.parse( + topology.get_bolts().get("bolt-1").get_common().get_json_conf()); + assertTrue(boltConf == null || !boltConf.containsKey(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE)); + } + @Test public void testBadHbase() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_hbase.yaml", false, true, null, false); diff --git a/flux/flux-core/src/test/resources/configs/component-config-invalid-key-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-invalid-key-test.yaml new file mode 100644 index 00000000000..7275a8476d4 --- /dev/null +++ b/flux/flux-core/src/test/resources/configs/component-config-invalid-key-test.yaml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +name: "component-config-invalid-topology" + +config: + topology.workers: 1 + +spouts: + - id: "spout-1" + className: "org.apache.storm.testing.TestWordSpout" + parallelism: 1 + +bolts: + - id: "bolt-1" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + config: + MY.INAVLID.KEY: "none" + +streams: + - from: "spout-1" + to: "bolt-1" + grouping: + type: SHUFFLE diff --git a/flux/flux-core/src/test/resources/configs/component-config-invalid-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-invalid-test.yaml new file mode 100644 index 00000000000..260f501eb5b --- /dev/null +++ b/flux/flux-core/src/test/resources/configs/component-config-invalid-test.yaml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +name: "component-config-invalid-topology" + +config: + topology.workers: 1 + +spouts: + - id: "spout-1" + className: "org.apache.storm.testing.TestWordSpout" + parallelism: 1 + +bolts: + - id: "bolt-1" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + config: + topology.tuple.compression.enable: "not-a-boolean" + +streams: + - from: "spout-1" + to: "bolt-1" + grouping: + type: SHUFFLE diff --git a/flux/flux-core/src/test/resources/configs/component-config-missing-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-missing-test.yaml new file mode 100644 index 00000000000..1ce783a2fda --- /dev/null +++ b/flux/flux-core/src/test/resources/configs/component-config-missing-test.yaml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +name: "component-config-missing-topology" + +config: + topology.workers: 1 + +# neither the spout nor the bolt declares a per-component config block +spouts: + - id: "spout-1" + className: "org.apache.storm.testing.TestWordSpout" + parallelism: 1 + +bolts: + - id: "bolt-1" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + +streams: + - from: "spout-1" + to: "bolt-1" + grouping: + type: SHUFFLE diff --git a/flux/flux-core/src/test/resources/configs/component-config-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-test.yaml new file mode 100644 index 00000000000..042e2b5b323 --- /dev/null +++ b/flux/flux-core/src/test/resources/configs/component-config-test.yaml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +name: "component-config-topology" + +# topology configuration (global scope) +config: + topology.workers: 1 + +# spout definitions +spouts: + - id: "spout-1" + className: "org.apache.storm.testing.TestWordSpout" + parallelism: 1 + # per-component configuration applied to this spout only + config: + topology.tuple.compression.enable: true + +# bolt definitions +bolts: + - id: "bolt-1" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + # per-component configuration applied to this bolt only + config: + topology.tuple.compression.enable: false + +#stream definitions +streams: + - from: "spout-1" + to: "bolt-1" + grouping: + type: SHUFFLE From 0b3f952740bfb0426782f7116b75dcee0770f090 Mon Sep 17 00:00:00 2001 From: Gianluca Graziadei Date: Sat, 6 Jun 2026 13:53:47 +0200 Subject: [PATCH 2/5] apply bolt/spout conf once, when the declarer is defined for the first stream --- .../org/apache/storm/flux/FluxBuilder.java | 74 ++++++++----------- 1 file changed, 29 insertions(+), 45 deletions(-) diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java index a81d60606f0..371935a7021 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java @@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory; public class FluxBuilder { - private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class); + private static final Logger LOG = LoggerFactory.getLogger(FluxBuilder.class); /** @@ -187,57 +187,41 @@ private static void buildStreamDefinitions(ExecutionContext context, TopologyBui for (StreamDef stream : topologyDef.getStreams()) { Object boltObj = context.getBolt(stream.getTo()); BoltDeclarer declarer = declarers.get(stream.getTo()); - if (boltObj instanceof IRichBolt) { - if (declarer == null) { - declarer = builder.setBolt(stream.getTo(), - (IRichBolt) boltObj, + boolean newDeclarer = declarer == null; + if (newDeclarer) { + declarer = switch (boltObj) { + case IRichBolt b -> builder.setBolt(stream.getTo(), b, topologyDef.parallelismForBolt(stream.getTo())); - declarers.put(stream.getTo(), declarer); - } - } else if (boltObj instanceof IBasicBolt) { - if (declarer == null) { - declarer = builder.setBolt( - stream.getTo(), - (IBasicBolt) boltObj, + case IBasicBolt b -> builder.setBolt(stream.getTo(), b, topologyDef.parallelismForBolt(stream.getTo())); - declarers.put(stream.getTo(), declarer); - } - } else if (boltObj instanceof IWindowedBolt) { - if (declarer == null) { - declarer = builder.setBolt( - stream.getTo(), - (IWindowedBolt) boltObj, + case IWindowedBolt b -> builder.setBolt(stream.getTo(), b, topologyDef.parallelismForBolt(stream.getTo())); - declarers.put(stream.getTo(), declarer); - } - } else if (boltObj instanceof IStatefulBolt) { - if (declarer == null) { - declarer = builder.setBolt( - stream.getTo(), - (IStatefulBolt) boltObj, + case IStatefulBolt b -> builder.setBolt(stream.getTo(), b, topologyDef.parallelismForBolt(stream.getTo())); - declarers.put(stream.getTo(), declarer); + default -> throw new IllegalArgumentException("Class does not appear to be a bolt: " + + boltObj.getClass().getName()); + }; + // resource and config declarations apply to the bolt as a whole, so only apply them once + // when the declarer is first created rather than on every incoming stream + BoltDef boltDef = topologyDef.getBoltDef(stream.getTo()); + if (boltDef.getOnHeapMemoryLoad() > -1) { + if (boltDef.getOffHeapMemoryLoad() > -1) { + declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad()); + } else { + declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad()); + } } - } else { - throw new IllegalArgumentException("Class does not appear to be a bolt: " - + boltObj.getClass().getName()); - } - - BoltDef boltDef = topologyDef.getBoltDef(stream.getTo()); - if (boltDef.getOnHeapMemoryLoad() > -1) { - if (boltDef.getOffHeapMemoryLoad() > -1) { - declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad()); - } else { - declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad()); + if (boltDef.getCpuLoad() > -1) { + declarer.setCPULoad(boltDef.getCpuLoad()); } + if (boltDef.getNumTasks() > -1) { + declarer.setNumTasks(boltDef.getNumTasks()); + } + applyComponentConfig(boltDef.getConfig(), declarer); + + // persist in declares cache + declarers.put(stream.getTo(), declarer); } - if (boltDef.getCpuLoad() > -1) { - declarer.setCPULoad(boltDef.getCpuLoad()); - } - if (boltDef.getNumTasks() > -1) { - declarer.setNumTasks(boltDef.getNumTasks()); - } - applyComponentConfig(boltDef.getConfig(), declarer); GroupingDef grouping = stream.getGrouping(); // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream From 4349aedb01cfd125b2aed7a1bab794b62c0cce5f Mon Sep 17 00:00:00 2001 From: Gianluca Graziadei Date: Sat, 6 Jun 2026 14:00:56 +0200 Subject: [PATCH 3/5] flux topology conf validation (fail fast) --- .../org/apache/storm/flux/FluxBuilder.java | 5 +++- .../apache/storm/flux/FluxBuilderTest.java | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java index 371935a7021..b715727bb31 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java @@ -74,7 +74,10 @@ public class FluxBuilder { public static Config buildConfig(TopologyDef topologyDef) { // merge contents of `config` into topology config Config conf = new Config(); - conf.putAll(topologyDef.getConfig()); + Map topologyConfig = topologyDef.getConfig(); + // validate the topology-wide config so invalid values fail fast + ConfigValidation.validateFields(topologyConfig); + conf.putAll(topologyConfig); return conf; } diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java index 4f5428ec63c..f3b82dc2abc 100644 --- a/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java +++ b/flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java @@ -17,8 +17,15 @@ */ package org.apache.storm.flux; +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.Config; +import org.apache.storm.flux.model.TopologyDef; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class FluxBuilderTest { @@ -29,4 +36,26 @@ public void testIsPrimitiveNumber() { assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class)); assertFalse(FluxBuilder.isPrimitiveNumber(String.class)); } + + @Test + public void testBuildConfigAcceptsValidTopologyConfig() { + Map config = new HashMap<>(); + config.put(Config.TOPOLOGY_WORKERS, 4); + TopologyDef topologyDef = new TopologyDef(); + topologyDef.setConfig(config); + + Config result = FluxBuilder.buildConfig(topologyDef); + assertEquals(4, result.get(Config.TOPOLOGY_WORKERS)); + } + + @Test + public void testBuildConfigRejectsInvalidTopologyConfig() { + // topology.workers must be a positive integer; a String value is invalid + Map config = new HashMap<>(); + config.put(Config.TOPOLOGY_WORKERS, "not-a-number"); + TopologyDef topologyDef = new TopologyDef(); + topologyDef.setConfig(config); + + assertThrows(IllegalArgumentException.class, () -> FluxBuilder.buildConfig(topologyDef)); + } } From 3d31642c76d92fb5bb6b46f9093e1c8984b9f232 Mon Sep 17 00:00:00 2001 From: Gianluca Graziadei Date: Sat, 6 Jun 2026 14:12:19 +0200 Subject: [PATCH 4/5] minor changes --- flux/README.md | 3 ++- .../src/main/java/org/apache/storm/flux/model/VertexDef.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flux/README.md b/flux/README.md index 360e3bc654a..bf41e3d3689 100644 --- a/flux/README.md +++ b/flux/README.md @@ -693,7 +693,8 @@ bolts: ``` Known Storm configuration keys are validated when the topology is built, so an invalid value (for example, a -non-boolean value for `topology.tuple.compression.enable`) fails fast rather than at submission time. +non-boolean value for `topology.tuple.compression.enable`) fails fast rather than at submission time. Unknown/custom +keys are not validated and are passed through verbatim; validation runs client-side at build time. ## Streams and Stream Groupings Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java index c31a3953dbb..1214957d3b1 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java @@ -34,7 +34,7 @@ public abstract class VertexDef extends BeanDef { private int offHeapMemoryLoad = -1; private int cpuLoad = -1; // per-component configuration - private Map config = new HashMap(); + private Map config = new HashMap<>(); public int getParallelism() { return parallelism; From 0e49fd661e45527852f85ee3663fac30e24c21e4 Mon Sep 17 00:00:00 2001 From: Gianluca Graziadei Date: Sat, 6 Jun 2026 14:35:23 +0200 Subject: [PATCH 5/5] add topology confi override test case --- .../java/org/apache/storm/flux/TCKTest.java | 27 +++++++++++ .../component-config-override-test.yaml | 47 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 flux/flux-core/src/test/resources/configs/component-config-override-test.yaml diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java index cfe6c9bf49a..2135046648b 100644 --- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java +++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java @@ -24,6 +24,7 @@ import org.apache.storm.flux.parser.FluxParser; import org.apache.storm.flux.test.TestBolt; import org.apache.storm.shade.net.minidev.json.JSONValue; +import org.apache.storm.utils.Utils; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; import static org.hamcrest.CoreMatchers.is; @@ -124,6 +125,32 @@ public void testComponentConfig() throws Exception { assertEquals(Boolean.FALSE, boltConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE)); } + @Test + public void testComponentConfigOverridesTopologyConfig() throws Exception { + TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-override-test.yaml", + false, true, null, false); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + StormTopology topology = FluxBuilder.buildTopology(context); + assertNotNull(topology); + topology.validate(); + + // the topology-level config carries the original value + assertEquals(Boolean.TRUE, conf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE)); + + // evaluate the effective config the way a worker does: topology config overlaid with the + // per-component config + Map boltComponentConf = (Map) JSONValue.parse( + topology.get_bolts().get("bolt-1").get_common().get_json_conf()); + Map effectiveBoltConf = Utils.merge(conf, boltComponentConf); + assertEquals(Boolean.FALSE, effectiveBoltConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE)); + + Map spoutComponentConf = (Map) JSONValue.parse( + topology.get_spouts().get("spout-1").get_common().get_json_conf()); + Map effectiveSpoutConf = Utils.merge(conf, spoutComponentConf); + assertEquals(Boolean.TRUE, effectiveSpoutConf.get(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE)); + } + @Test public void testComponentConfigWithInvalidValue() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/component-config-invalid-test.yaml", false, diff --git a/flux/flux-core/src/test/resources/configs/component-config-override-test.yaml b/flux/flux-core/src/test/resources/configs/component-config-override-test.yaml new file mode 100644 index 00000000000..7d08314e03b --- /dev/null +++ b/flux/flux-core/src/test/resources/configs/component-config-override-test.yaml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +name: "component-config-override-topology" + +# topology configuration (global scope) +config: + topology.workers: 1 + topology.tuple.compression.enable: true + +# spout definitions +spouts: + - id: "spout-1" + className: "org.apache.storm.testing.TestWordSpout" + parallelism: 1 + # no per-component override: the effective config inherits the topology-level value + +# bolt definitions +bolts: + - id: "bolt-1" + className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" + parallelism: 1 + # per-component configuration overrides the topology-level value for this bolt only + config: + topology.tuple.compression.enable: false + +#stream definitions +streams: + - from: "spout-1" + to: "bolt-1" + grouping: + type: SHUFFLE