Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions docs/Serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,32 @@ You can also enable it topology-wide (or cluster-wide via `storm.yaml`) by setti

#### Flux

> **Note:** With [Flux](flux.html), only **topology-wide** enablement is currently possible. Flux has no per-component configuration mechanism — `FluxBuilder` applies only parallelism, number of tasks, memory/CPU load, and groupings to the underlying declarers, and the `config:` block is topology-scoped. There is no Flux equivalent of `declarer.addConfiguration(...)`, so the per-component approach recommended above cannot be expressed in a Flux YAML definition.
[Flux](flux.html) supports per-component configuration. In addition to parallelism, number of tasks, memory/CPU load, and groupings, each spout and bolt definition accepts a `config:` block that `FluxBuilder` applies to the underlying declarer via `addConfigurations(...)`. This is the Flux equivalent of `declarer.addConfiguration(...)`, so you can enable compression for just the components that emit large tuples:

To enable compression for a Flux topology, set it in the topology-level `config:` block:
```yaml
spouts:
- id: "file-read-spout"
className: "org.apache.storm.perf.spout.FileReadSpout"
parallelism: 1
# enable compression for this spout only
config:
topology.tuple.compression.enable: true

bolts:
- id: "split"
className: "org.apache.storm.perf.bolt.SplitSentenceBolt"
parallelism: 1
```

You can also enable it topology-wide by setting it in the topology-level `config:`

```yaml
config:
topology.tuple.compression.enable: true
topology.tuple.compression.threshold: 1460
```

Be aware that this enables compression for *every* remote-bound tuple in the topology that exceeds the threshold.
Be aware that the topology-wide form enables compression for *every* remote-bound tuple in the topology that exceeds the threshold.

#### Configuration reference

Expand Down
40 changes: 40 additions & 0 deletions flux/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,17 @@ component when the topology is deployed.
Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as
well.

In addition to `parallelism`, spout and bolt definitions support the following optional parameters that map directly to
the underlying Storm `BoltDeclarer`/`SpoutDeclarer`:

| Parameter | Description |
|---------------------|------------------------------------------------------------------------------------------|
| `numTasks` | The number of tasks for the component (`setNumTasks`). |
| `onHeapMemoryLoad` | The on-heap memory load, in MB, for resource-aware scheduling (`setMemoryLoad`). |
| `offHeapMemoryLoad` | The off-heap memory load, in MB, for resource-aware scheduling (`setMemoryLoad`). |
| `cpuLoad` | The CPU load for resource-aware scheduling (`setCPULoad`). |
| `config` | A map of configuration parameters applied only to this component (`addConfigurations`). |

Shell spout example:

```yaml
Expand Down Expand Up @@ -656,6 +667,35 @@ bolts:
parallelism: 1
# ...
```

### Per-Component Configuration
In addition to the topology-wide [Topology Config](#topology-config), each spout and bolt can declare its own `config`
map. These configurations are applied to that component only, via the declarer's `addConfigurations(...)` method,
following Storm's native support for component-level configuration. This avoids enabling a configuration topology-wide
when only a single component requires it.

```yaml
spouts:
- id: "sentence-spout"
className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
parallelism: 1
# configuration applied to this spout only
config:
topology.max.spout.pending: 1000

bolts:
- id: "log"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
# configuration applied to this bolt only
config:
topology.tuple.compression.enable: true
```

Known Storm configuration keys are validated when the topology is built, so an invalid value (for example, a
non-boolean value for `topology.tuple.compression.enable`) fails fast rather than at submission time. Unknown/custom
keys are not validated and are passed through verbatim; validation runs client-side at build time.

## Streams and Stream Groupings
Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
a topology, with an associated Grouping definition.
Expand Down
89 changes: 44 additions & 45 deletions flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.hooks.IWorkerHook;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.ComponentConfigurationDeclarer;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
Expand All @@ -57,11 +58,12 @@
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FluxBuilder {
private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
private static final Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);


/**
Expand All @@ -72,7 +74,10 @@ public class FluxBuilder {
public static Config buildConfig(TopologyDef topologyDef) {
// merge contents of `config` into topology config
Config conf = new Config();
conf.putAll(topologyDef.getConfig());
Map<String, Object> topologyConfig = topologyDef.getConfig();
// validate the topology-wide config so invalid values fail fast
ConfigValidation.validateFields(topologyConfig);
conf.putAll(topologyConfig);
return conf;
}

Expand Down Expand Up @@ -185,55 +190,40 @@ private static void buildStreamDefinitions(ExecutionContext context, TopologyBui
for (StreamDef stream : topologyDef.getStreams()) {
Object boltObj = context.getBolt(stream.getTo());
BoltDeclarer declarer = declarers.get(stream.getTo());
if (boltObj instanceof IRichBolt) {
if (declarer == null) {
declarer = builder.setBolt(stream.getTo(),
(IRichBolt) boltObj,
boolean newDeclarer = declarer == null;
if (newDeclarer) {
declarer = switch (boltObj) {
case IRichBolt b -> builder.setBolt(stream.getTo(), b,
topologyDef.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
}
} else if (boltObj instanceof IBasicBolt) {
if (declarer == null) {
declarer = builder.setBolt(
stream.getTo(),
(IBasicBolt) boltObj,
case IBasicBolt b -> builder.setBolt(stream.getTo(), b,
topologyDef.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
}
} else if (boltObj instanceof IWindowedBolt) {
if (declarer == null) {
declarer = builder.setBolt(
stream.getTo(),
(IWindowedBolt) boltObj,
case IWindowedBolt b -> builder.setBolt(stream.getTo(), b,
topologyDef.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
}
} else if (boltObj instanceof IStatefulBolt) {
if (declarer == null) {
declarer = builder.setBolt(
stream.getTo(),
(IStatefulBolt) boltObj,
case IStatefulBolt b -> builder.setBolt(stream.getTo(), b,
topologyDef.parallelismForBolt(stream.getTo()));
declarers.put(stream.getTo(), declarer);
default -> throw new IllegalArgumentException("Class does not appear to be a bolt: "
+ boltObj.getClass().getName());
};
// resource and config declarations apply to the bolt as a whole, so only apply them once
// when the declarer is first created rather than on every incoming stream
BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
if (boltDef.getOnHeapMemoryLoad() > -1) {
if (boltDef.getOffHeapMemoryLoad() > -1) {
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
} else {
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
}
}
} else {
throw new IllegalArgumentException("Class does not appear to be a bolt: "
+ boltObj.getClass().getName());
}

BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
if (boltDef.getOnHeapMemoryLoad() > -1) {
if (boltDef.getOffHeapMemoryLoad() > -1) {
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
} else {
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
if (boltDef.getCpuLoad() > -1) {
declarer.setCPULoad(boltDef.getCpuLoad());
}
}
if (boltDef.getCpuLoad() > -1) {
declarer.setCPULoad(boltDef.getCpuLoad());
}
if (boltDef.getNumTasks() > -1) {
declarer.setNumTasks(boltDef.getNumTasks());
if (boltDef.getNumTasks() > -1) {
declarer.setNumTasks(boltDef.getNumTasks());
}
applyComponentConfig(boltDef.getConfig(), declarer);

// persist in declares cache
declarers.put(stream.getTo(), declarer);
}

GroupingDef grouping = stream.getGrouping();
Expand Down Expand Up @@ -456,6 +446,7 @@ private static void buildSpouts(ExecutionContext context, TopologyBuilder builde
if (sd.getNumTasks() > -1) {
declarer.setNumTasks(sd.getNumTasks());
}
applyComponentConfig(sd.getConfig(), declarer);

context.addSpout(sd.getId(), spout);
}
Expand All @@ -470,6 +461,14 @@ private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) thr
return (IRichSpout) buildObject(def, context);
}

private static void applyComponentConfig(Map<String, Object> config, ComponentConfigurationDeclarer declarer) {
if (config == null || config.isEmpty()) {
return;
}
ConfigValidation.validateFields(config);
declarer.addConfigurations(config);
}

/**
* Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
* Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.storm.flux.model;

import java.util.HashMap;
import java.util.Map;

/**
* Abstract parent class of component definitions.
* (spouts/bolts)
Expand All @@ -30,6 +33,8 @@ public abstract class VertexDef extends BeanDef {
private int onHeapMemoryLoad = -1;
private int offHeapMemoryLoad = -1;
private int cpuLoad = -1;
// per-component configuration
private Map<String, Object> config = new HashMap<>();

public int getParallelism() {
return parallelism;
Expand Down Expand Up @@ -70,4 +75,12 @@ public int getCpuLoad() {
public void setCpuLoad(int cpuLoad) {
this.cpuLoad = cpuLoad;
}

public Map<String, Object> getConfig() {
return config;
}

public void setConfig(Map<String, Object> config) {
this.config = config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
*/
package org.apache.storm.flux;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.flux.model.TopologyDef;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class FluxBuilderTest {
Expand All @@ -29,4 +36,26 @@ public void testIsPrimitiveNumber() {
assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
}

@Test
public void testBuildConfigAcceptsValidTopologyConfig() {
Map<String, Object> config = new HashMap<>();
config.put(Config.TOPOLOGY_WORKERS, 4);
TopologyDef topologyDef = new TopologyDef();
topologyDef.setConfig(config);

Config result = FluxBuilder.buildConfig(topologyDef);
assertEquals(4, result.get(Config.TOPOLOGY_WORKERS));
}

@Test
public void testBuildConfigRejectsInvalidTopologyConfig() {
// topology.workers must be a positive integer; a String value is invalid
Map<String, Object> config = new HashMap<>();
config.put(Config.TOPOLOGY_WORKERS, "not-a-number");
TopologyDef topologyDef = new TopologyDef();
topologyDef.setConfig(config);

assertThrows(IllegalArgumentException.class, () -> FluxBuilder.buildConfig(topologyDef));
}
}
Loading
Loading