Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.openjdk.jmh.annotations._

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl._
import pekko.stream.scaladsl.{ GatherCollector, Gatherer, Keep, OneToOneGatherer, Sink, Source }

import com.typesafe.config.ConfigFactory

Expand Down Expand Up @@ -76,6 +76,103 @@ class ZipWithIndexBenchmark {
}
.toMat(Sink.ignore)(Keep.right)

private val statefulMapZipWithIndex = Source
.repeat(1)
.take(OperationsPerInvocation)
.statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None)
.toMat(Sink.ignore)(Keep.right)

private val gatherPublicZipWithIndex = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new Gatherer[Int, (Int, Long)] {
private var index = 0L

override def apply(elem: Int, collector: GatherCollector[(Int, Long)]): Unit = {
val zipped = (elem, index)
index += 1
collector.push(zipped)
}
})
.toMat(Sink.ignore)(Keep.right)

private val gatherInternalOneToOneZipWithIndex = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new OneToOneGatherer[Int, (Int, Long)] {
private var index = 0L

override def applyOne(elem: Int): (Int, Long) = {
val zipped = (elem, index)
index += 1
zipped
}
})
.toMat(Sink.ignore)(Keep.right)

private val statefulMapIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.statefulMap(() => ())((state, elem) => (state, elem + 1), _ => None)
.toMat(Sink.ignore)(Keep.right)

private val gatherPublicIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new Gatherer[Int, Int] {
override def apply(elem: Int, collector: GatherCollector[Int]): Unit =
collector.push(elem + 1)
})
.toMat(Sink.ignore)(Keep.right)

private val gatherInternalOneToOneIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new OneToOneGatherer[Int, Int] {
override def applyOne(elem: Int): Int = elem + 1
})
.toMat(Sink.ignore)(Keep.right)

private val statefulMapCountedIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.statefulMap(() => 0L)((index, elem) => (index + 1, elem + index.toInt), _ => None)
.toMat(Sink.ignore)(Keep.right)

private val gatherPublicCountedIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new Gatherer[Int, Int] {
private var index = 0L

override def apply(elem: Int, collector: GatherCollector[Int]): Unit = {
val incremented = elem + index.toInt
index += 1
collector.push(incremented)
}
})
.toMat(Sink.ignore)(Keep.right)

private val gatherInternalOneToOneCountedIncrement = Source
.repeat(1)
.take(OperationsPerInvocation)
.gather(() =>
new OneToOneGatherer[Int, Int] {
private var index = 0L

override def applyOne(elem: Int): Int = {
val incremented = elem + index.toInt
index += 1
incremented
}
})
.toMat(Sink.ignore)(Keep.right)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchOldZipWithIndex(): Unit =
Expand All @@ -86,4 +183,49 @@ class ZipWithIndexBenchmark {
def benchNewZipWithIndex(): Unit =
Await.result(newZipWithIndex.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchStatefulMapZipWithIndex(): Unit =
Await.result(statefulMapZipWithIndex.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherPublicZipWithIndex(): Unit =
Await.result(gatherPublicZipWithIndex.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherInternalOneToOneZipWithIndex(): Unit =
Await.result(gatherInternalOneToOneZipWithIndex.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchStatefulMapIncrement(): Unit =
Await.result(statefulMapIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherPublicIncrement(): Unit =
Await.result(gatherPublicIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherInternalOneToOneIncrement(): Unit =
Await.result(gatherInternalOneToOneIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchStatefulMapCountedIncrement(): Unit =
Await.result(statefulMapCountedIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherPublicCountedIncrement(): Unit =
Await.result(gatherPublicCountedIncrement.run(), Duration.Inf)

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def benchGatherInternalOneToOneCountedIncrement(): Unit =
Await.result(gatherInternalOneToOneCountedIncrement.run(), Duration.Inf)

}
68 changes: 68 additions & 0 deletions docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# gather

Transform each input element into zero or more output elements using a stateful gatherer.

@ref[Simple operators](../index.md#simple-operators)

## Signature

@apidoc[Flow.gather](Flow) { scala="#gather%5BT%5D%28create%3A%28%29%3D%3EGatherer%5BOut%2CT%5D%29%3ARepr%5BT%5D" java="#gather(org.apache.pekko.japi.function.Creator)" }

## Description

Transform each input element into zero or more output elements without requiring tuple or collection allocations
imposed by the operator API itself.

A new `Gatherer` is created for each materialization and can keep mutable state in fields or closures.
The provided `GatherCollector` can emit zero or more output elements for each input element.

The collector is only valid while the callback is running. Emitted elements MUST NOT be `null`.

The `onComplete` callback is invoked once whenever the stage terminates or restarts: on upstream completion,
upstream failure, downstream cancellation, abrupt stage termination, or supervision restart.
Elements emitted from `onComplete` are emitted before upstream-failure propagation, completion, or restart,
and are ignored on downstream cancellation and abrupt termination.

The `gather` operator adheres to the @ref:[ActorAttributes.SupervisionStrategy](../../actors.md) attribute.

For a simpler stateless mapping, use @ref:[map](map.md) or @ref:[mapConcat](mapConcat.md).

## Examples

In the first example, we implement a `zipWithIndex` operator like @ref:[zipWithIndex](zipWithIndex.md):

Scala
: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #zipWithIndex }

Java
: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #zipWithIndex }

In the second example, elements are buffered until a different element arrives, then emitted:

Scala
: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #bufferUntilChanged }

Java
: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #bufferUntilChanged }

In the third example, repeated incoming elements are only emitted once:

Scala
: @@snip [Gather.scala](/docs/src/test/scala/docs/stream/operators/flow/Gather.scala) { #distinctUntilChanged }

Java
: @@snip [Gather.java](/docs/src/test/java/jdocs/stream/operators/flow/Gather.java) { #distinctUntilChanged }

## Reactive Streams semantics

@@@div { .callout }

**emits** when the gatherer emits an element and downstream is ready to consume it

**backpressures** when downstream backpressures

**completes** upstream completes and the gatherer has emitted all pending elements, including `onComplete`

**cancels** downstream cancels

@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="foldwhile"></a>@ref[foldWhile](Source-or-Flow/foldWhile.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes or the predicate `p` returns `false`, the current value is emitted downstream.|
|Source/Flow|<a name="frommaterializer"></a>@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
|Flow|<a name="futureflow"></a>@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Source/Flow|<a name="gather"></a>@ref[gather](Source-or-Flow/gather.md)|Transform each input element into zero or more output elements with a stateful gatherer.|
|Source/Flow|<a name="grouped"></a>@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.|
|Source/Flow|<a name="groupedadjacentby"></a>@ref[groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)|Partitions this stream into chunks by a delimiter function.|
|Source/Flow|<a name="groupedadjacentbyweighted"></a>@ref[groupedAdjacentByWeighted](Source-or-Flow/groupedAdjacentByWeighted.md)|Partitions this stream into chunks by a delimiter function and a weight limit.|
Expand Down Expand Up @@ -498,6 +499,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [futureFlow](Flow/futureFlow.md)
* [futureSink](Sink/futureSink.md)
* [futureSource](Source/futureSource.md)
* [gather](Source-or-Flow/gather.md)
* [groupBy](Source-or-Flow/groupBy.md)
* [grouped](Source-or-Flow/grouped.md)
* [groupedAdjacentBy](Source-or-Flow/groupedAdjacentBy.md)
Expand Down
113 changes: 113 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/flow/Gather.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jdocs.stream.operators.flow;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.GatherCollector;
import org.apache.pekko.stream.javadsl.Gatherer;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

public class Gather {

static final ActorSystem system = null;

static void zipWithIndex() {
// #zipWithIndex
Source.from(Arrays.asList("A", "B", "C", "D"))
.gather(
() ->
new Gatherer<String, String>() {
private long index = 0L;

@Override
public void apply(String elem, GatherCollector<String> collector) {
collector.push("(" + elem + "," + index + ")");
index += 1;
}
})
.runWith(Sink.foreach(System.out::println), system);
// prints
// (A,0)
// (B,1)
// (C,2)
// (D,3)
// #zipWithIndex
}

static void bufferUntilChanged() {
// #bufferUntilChanged
Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))
.gather(
() ->
new Gatherer<String, List<String>>() {
private final List<String> buffer = new ArrayList<>();

@Override
public void apply(String elem, GatherCollector<List<String>> collector) {
if (!buffer.isEmpty() && !buffer.get(0).equals(elem)) {
collector.push(new ArrayList<>(buffer));
buffer.clear();
}
buffer.add(elem);
}

@Override
public void onComplete(GatherCollector<List<String>> collector) {
if (!buffer.isEmpty()) {
collector.push(new ArrayList<>(buffer));
}
}
})
.runWith(Sink.foreach(System.out::println), system);
// prints
// [A]
// [B, B]
// [C, C, C]
// [D]
// #bufferUntilChanged
}

static void distinctUntilChanged() {
// #distinctUntilChanged
Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))
.gather(
() ->
new Gatherer<String, String>() {
private String lastElement = null;

@Override
public void apply(String elem, GatherCollector<String> collector) {
if (!elem.equals(lastElement)) {
lastElement = elem;
collector.push(elem);
}
}
})
.runWith(Sink.foreach(System.out::println), system);
// prints
// A
// B
// C
// D
// #distinctUntilChanged
}
}
Loading
Loading