diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index f09c0cd6eff2..fbf19dbe2613 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.state.Timers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -136,22 +137,19 @@ public TimersImpl(StateNamespace namespace) { @Override public void setTimer(Instant timestamp, TimeDomain timeDomain) { timerInternals.setTimer( - TimerData.of( - namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL)); } @Override public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { timerInternals.setTimer( - TimerData.of( - namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, CausedByDrain.NORMAL)); } @Override public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { timerInternals.deleteTimer( - TimerData.of( - namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL)); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 4cc96ee4ad54..8f8c69b5e977 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.util.construction.SplittableParDo.ProcessKeyedElements; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -604,7 +605,7 @@ public String getErrorContext() { wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } private DoFnInvoker.ArgumentProvider wrapOptionsAsSetup( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 92b9f92d0594..f19a5b1cf2e3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ComparisonChain; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -168,10 +169,6 @@ void deleteTimer( /** Data about a timer as represented within {@link TimerInternals}. */ @AutoValue abstract class TimerData implements Comparable { - public enum CausedByDrain { - CAUSED_BY_DRAIN, - NORMAL - } public abstract String getTimerId(); @@ -245,7 +242,7 @@ public static TimerData of( timestamp, outputTimestamp, domain, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); } /** @@ -355,7 +352,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException timestamp, outputTimestamp, domain, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); } @Override @@ -401,8 +398,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); Instant timestamp = INSTANT_CODER.decode(inStream); TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream)); - return TimerData.of( - timerId, namespace, timestamp, timestamp, domain, TimerData.CausedByDrain.NORMAL); + return TimerData.of(timerId, namespace, timestamp, timestamp, domain, CausedByDrain.NORMAL); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index 0444048b0798..3cc7a248fda2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; @@ -47,7 +48,7 @@ public void testFiringEventTimers() throws Exception { new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData eventTimer2 = TimerData.of( ID2, @@ -55,7 +56,7 @@ public void testFiringEventTimers() throws Exception { new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.setTimer(eventTimer1); underTest.setTimer(eventTimer2); @@ -128,14 +129,14 @@ public void testFiringProcessingTimeTimers() throws Exception { new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData processingTime2 = TimerData.of( NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.setTimer(processingTime1); underTest.setTimer(processingTime2); @@ -165,46 +166,38 @@ public void testTimerOrdering() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData eventTime1 = TimerData.of( - NS1, - new Instant(19), - new Instant(19), - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData processingTime1 = TimerData.of( NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData synchronizedProcessingTime1 = TimerData.of( NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData eventTime2 = TimerData.of( - NS1, - new Instant(29), - new Instant(29), - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData processingTime2 = TimerData.of( NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData synchronizedProcessingTime2 = TimerData.of( NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.setTimer(processingTime1); underTest.setTimer(eventTime1); @@ -239,18 +232,14 @@ public void testDeduplicate() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData eventTime = TimerData.of( - NS1, - new Instant(19), - new Instant(19), - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData processingTime = TimerData.of( NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.setTimer(eventTime); underTest.setTimer(eventTime); underTest.setTimer(processingTime); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java index e5355dd3cfe5..56f6248d747a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -49,7 +50,7 @@ public void testEncodeDecodeEqual() throws Exception { new Instant(500L), new Instant(500L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); Iterable> elements = ImmutableList.of( WindowedValues.valueInGlobalWindow(1), diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index ecaf3c9433cb..43b6a3cb0cb0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -59,6 +59,7 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.WindowedValue; @@ -578,7 +579,7 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc timestamp, timestamp, domain, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); runner.onTimers(timers); runner.persist(); } @@ -593,7 +594,7 @@ public void fireTimers(W window, TimestampedValue... timers) throws timer.getTimestamp(), timer.getTimestamp(), timer.getValue(), - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } runner.onTimers(timerData); runner.persist(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index e13b97ee62db..92385b1c8220 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -702,7 +703,7 @@ public void onTimer(OnTimerContext context) { context.fireTimestamp(), context.timestamp(), context.timeDomain(), - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index fe95fcec2e8d..e32065f6ef56 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IdentitySideInputWindowFn; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -318,7 +319,7 @@ public void testOnTimerCalled() { timestamp, timestamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL))); + CausedByDrain.NORMAL))); } private static class TestDoFnRunner implements DoFnRunner { @@ -361,7 +362,7 @@ public void onTimer( timestamp, outputTimestamp, timeDomain, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java index 0e0b8ba40982..78f8322b8da4 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,7 +48,7 @@ public void testTimerDataCoder() throws Exception { new Instant(0), new Instant(0), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); Coder windowCoder = IntervalWindow.getCoder(); CoderProperties.coderDecodeEncodeEqual( @@ -59,7 +60,7 @@ windowCoder, new IntervalWindow(new Instant(0), new Instant(100))), new Instant(99), new Instant(99), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } @Test @@ -73,12 +74,7 @@ public void testCompareEqual() { StateNamespace namespace = StateNamespaces.global(); TimerData timer = TimerData.of( - "id", - namespace, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + "id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); assertThat( timer, @@ -89,7 +85,7 @@ public void testCompareEqual() { timestamp, timestamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL))); + CausedByDrain.NORMAL))); } @Test @@ -100,18 +96,14 @@ public void testCompareByTimestamp() { TimerData firstTimer = TimerData.of( - namespace, - firstTimestamp, - firstTimestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData secondTimer = TimerData.of( namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); assertThat(firstTimer, lessThan(secondTimer)); } @@ -122,22 +114,17 @@ public void testCompareByDomain() { StateNamespace namespace = StateNamespaces.global(); TimerData eventTimer = - TimerData.of( - namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, TimerData.CausedByDrain.NORMAL); + TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData procTimer = TimerData.of( - namespace, - timestamp, - timestamp, - TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME, CausedByDrain.NORMAL); TimerData synchronizedProcTimer = TimerData.of( namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); assertThat(eventTimer, lessThan(procTimer)); assertThat(eventTimer, lessThan(synchronizedProcTimer)); @@ -156,18 +143,10 @@ public void testCompareByNamespace() { TimerData secondEventTime = TimerData.of( - firstWindowNs, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData thirdEventTime = TimerData.of( - secondWindowNs, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); assertThat(secondEventTime, lessThan(thirdEventTime)); } @@ -179,20 +158,10 @@ public void testCompareByTimerId() { TimerData id0Timer = TimerData.of( - "id0", - namespace, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + "id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData id1Timer = TimerData.of( - "id1", - namespace, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + "id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); assertThat(id0Timer, lessThan(id1Timer)); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 4819efa73f96..0f3f17c337ce 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; @@ -401,22 +402,19 @@ public TestTimers(StateNamespace namespace) { @Override public void setTimer(Instant timestamp, TimeDomain timeDomain) { timerInternals.setTimer( - TimerData.of( - namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL)); } @Override public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { timerInternals.setTimer( - TimerData.of( - namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, CausedByDrain.NORMAL)); } @Override public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { timerInternals.deleteTimer( - TimerData.of( - namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL)); } @Override diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java index cf9296139ddb..d8a68b8ccea6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.local.StructuralKey; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; @@ -70,21 +71,21 @@ public void setTimerAddsToBuilder() { new Instant(20145L), new Instant(20145L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData processingTimer = TimerData.of( StateNamespaces.global(), new Instant(125555555L), new Instant(125555555L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData synchronizedProcessingTimer = TimerData.of( StateNamespaces.global(), new Instant(98745632189L), new Instant(98745632189L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); internals.setTimer(eventTimer); internals.setTimer(processingTimer); internals.setTimer(synchronizedProcessingTimer); @@ -102,21 +103,21 @@ public void deleteTimerDeletesOnBuilder() { new Instant(20145L), new Instant(20145L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData processingTimer = TimerData.of( StateNamespaces.global(), new Instant(125555555L), new Instant(125555555L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData synchronizedProcessingTimer = TimerData.of( StateNamespaces.global(), new Instant(98745632189L), new Instant(98745632189L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); internals.deleteTimer(eventTimer); internals.deleteTimer(processingTimer); internals.deleteTimer(synchronizedProcessingTimer); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index d0da3a293dbe..53c02e796002 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.hamcrest.Matchers; @@ -111,7 +112,7 @@ public void removesOnExceptionInOnTimer() throws Exception { new Instant(0), new Instant(0), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL), + CausedByDrain.NORMAL), "", GlobalWindow.INSTANCE); } catch (Exception e) { diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 0427c2ed5401..3375ebfa36ce 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -61,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -338,7 +339,7 @@ public void extractFiredTimersExtractsTimers() { new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TransformResult timerResult = StepTransformResult.withoutHold(downstreamProducer) .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index a2e163acc648..1853344ee1ac 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -61,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -1003,14 +1004,14 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { new Instant(250L), new Instant(250L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData futureTimer = TimerData.of( StateNamespaces.global(), new Instant(4096L), new Instant(4096L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdate timers = TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build(); manager.updateWatermarks( impulseBundle, @@ -1150,7 +1151,7 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( upstreamHold, upstreamHold, TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); manager.updateWatermarks( impulseBundle, TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of())) @@ -1246,21 +1247,21 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { new Instant(1000), new Instant(1000), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1337,21 +1338,21 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { new Instant(999L), new Instant(999L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(-12L, VarLongCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1428,21 +1429,21 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { new Instant(999L), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1510,7 +1511,7 @@ public void processingTimeTimersCanBeReset() { new Instant(5000L), new Instant(5000L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData overridingTimer = TimerData.of( @@ -1519,7 +1520,7 @@ public void processingTimeTimersCanBeReset() { new Instant(10000L), new Instant(10000L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build(); TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build(); @@ -1568,7 +1569,7 @@ public void eventTimeTimersCanBeReset() { new Instant(1000L), new Instant(1000L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData overridingTimer = TimerData.of( timerId, @@ -1576,7 +1577,7 @@ public void eventTimeTimersCanBeReset() { new Instant(2000L), new Instant(2000L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build(); TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build(); @@ -1640,7 +1641,7 @@ public void inputWatermarkDuplicates() { new Instant(100), new Instant(100), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData timer2 = TimerData.of( "a", @@ -1648,7 +1649,7 @@ public void inputWatermarkDuplicates() { new Instant(200), new Instant(200), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build()); // Only the last timer update should be observable @@ -1684,28 +1685,28 @@ public void timerUpdateBuilderBuildAddsAllAddedTimers() { new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData deleted = TimerData.of( StateNamespaces.global(), new Instant(24L), new Instant(24L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData completedOne = TimerData.of( StateNamespaces.global(), new Instant(1024L), new Instant(1024L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData completedTwo = TimerData.of( StateNamespaces.global(), new Instant(2048L), new Instant(2048L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdate update = TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of())) @@ -1728,7 +1729,7 @@ public void timerUpdateBuilderWithSetAtEndOfTime() { timerStamp, timerStamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty()); thrown.expect(IllegalArgumentException.class); @@ -1745,7 +1746,7 @@ public void timerUpdateBuilderWithSetPastEndOfTime() { timerStamp, timerStamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty()); thrown.expect(IllegalArgumentException.class); @@ -1759,11 +1760,7 @@ public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build(); @@ -1777,11 +1774,7 @@ public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build(); @@ -1795,11 +1788,7 @@ public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.setTimer(timer); @@ -1814,11 +1803,7 @@ public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.deletedTimer(timer); @@ -1833,11 +1818,7 @@ public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.withCompletedTimers(ImmutableList.of(timer)); @@ -1852,11 +1833,7 @@ public void timerUpdateWithCompletedTimersNotAddedToExisting() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.build(); assertThat(built.getCompletedTimers(), emptyIterable()); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index e7bb4dc9c0ac..d4cd1af386e3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -64,6 +64,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -1379,8 +1380,8 @@ public T getValue() { } @Override - public boolean causedByDrain() { - return false; + public CausedByDrain causedByDrain() { + return CausedByDrain.NORMAL; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index c248259a12de..625dc590d24b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; @@ -122,12 +123,14 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce * https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry induced by * drain happened upstream */ - boolean drainingValueFromUpstream = false; + CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL; if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { BeamFnApi.Elements.ElementMetadata elementMetadata = WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); drainingValueFromUpstream = - elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING; + elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING + ? CausedByDrain.CAUSED_BY_DRAIN + : CausedByDrain.NORMAL; } if (valueCoder instanceof KvCoder) { KvCoder kvCoder = (KvCoder) valueCoder; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index 1f99d929898c..da69f1a23718 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate; @@ -124,12 +125,14 @@ public Iterable> elementsIterable() { * https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry * induced by drain happened upstream */ - boolean drainingValueFromUpstream = false; + CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL; if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { BeamFnApi.Elements.ElementMetadata elementMetadata = WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); drainingValueFromUpstream = - elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING; + elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING + ? CausedByDrain.CAUSED_BY_DRAIN + : CausedByDrain.NORMAL; } InputStream inputStream = message.getData().newInput(); ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index f4eb42751f0d..c62839333c7d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -108,7 +109,7 @@ public void setTimer( timestamp, outputTimestamp, timeDomain, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); setTimer(timer); } @@ -146,7 +147,7 @@ public void deleteTimer( BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE, timeDomain, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 00bb282c6845..2f7a5ce54fbf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -22,6 +22,7 @@ import java.util.Objects; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -60,8 +61,8 @@ public PaneInfo getPaneInfo() { } @Override - public boolean causedByDrain() { - return false; + public CausedByDrain causedByDrain() { + return CausedByDrain.NORMAL; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 520e9b3f5745..35dc19dd7816 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -75,6 +75,7 @@ import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.util.WindowedValueReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -159,7 +160,7 @@ private void addTimer( type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL))) + CausedByDrain.NORMAL))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timestamp)) .setType(type) .setStateFamily(STATE_FAMILY); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java index d30c59b2709a..e12ddd95f913 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -167,7 +168,7 @@ private TimerData timerData(IntervalWindow window, Instant timestamp, Timer.Type timestamp, timestamp, type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); } private IntervalWindow window(long start, long end) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index de85ee9ede5e..4bfa6efc8880 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -76,6 +76,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -177,7 +178,7 @@ public void testTimerInternalsSetTimer() { new Instant(5000), new Instant(5000), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); executionContext.flushState(); Windmill.Timer timer = outputBuilder.buildPartial().getOutputTimers(0); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java index 6400bb8b10d7..06db68ccc324 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -211,7 +212,7 @@ private TimerData createTimer(long timestamp) { new Instant(timestamp), new Instant(timestamp), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); } private IntervalWindow createWindow(long timestamp) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java index 00614c4b7397..9d3fa9b211b1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.util.DoFnInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.StringUtils; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValues; @@ -463,7 +464,7 @@ public void testCleanupTimerForGlobalWindowWithAllowedLateness() throws Exceptio BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)) + CausedByDrain.NORMAL)) .thenReturn(null); // Set up non-empty state. We don't mock + verify calls to clear() but instead @@ -539,7 +540,7 @@ public void testCleanupWorks() throws Exception { firstWindow.maxTimestamp().plus(Duration.millis(1L)), firstWindow.maxTimestamp().plus(Duration.millis(1L)), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)) + CausedByDrain.NORMAL)) .thenReturn(null); // This should fire the timer to clean up the first window @@ -556,7 +557,7 @@ public void testCleanupWorks() throws Exception { secondWindow.maxTimestamp().plus(Duration.millis(1L)), secondWindow.maxTimestamp().plus(Duration.millis(1L)), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)) + CausedByDrain.NORMAL)) .thenReturn(null); // And this should clean up the second window diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index 080be93a8955..9062c881096f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; @@ -203,7 +204,7 @@ private Windmill.Timer makeSerializedTimer( new Instant(timestamp), new Instant(timestamp), timerTypeToTimeDomain(type), - TimerData.CausedByDrain.NORMAL))) + CausedByDrain.NORMAL))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp))) .setType(type) .setStateFamily(STATE_FAMILY) @@ -212,7 +213,7 @@ private Windmill.Timer makeSerializedTimer( private static TimerData makeTimer(StateNamespace ns, long timestamp, TimeDomain domain) { return TimerData.of( - ns, new Instant(timestamp), new Instant(timestamp), domain, TimerData.CausedByDrain.NORMAL); + ns, new Instant(timestamp), new Instant(timestamp), domain, CausedByDrain.NORMAL); } @Test @@ -265,8 +266,8 @@ public void testDrainPropagated() throws Exception { true); Iterator> iterator = keyedWorkItem.elementsIterable().iterator(); - Assert.assertTrue(iterator.next().causedByDrain()); - Assert.assertFalse(iterator.next().causedByDrain()); + Assert.assertEquals(CausedByDrain.CAUSED_BY_DRAIN, iterator.next().causedByDrain()); + Assert.assertEquals(CausedByDrain.NORMAL, iterator.next().causedByDrain()); // todo add assert for draining once timerdata is filled // (https://github.com/apache/beam/issues/36884) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 9c4e1daaf3d0..48751c57754c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; @@ -144,17 +145,13 @@ public void testTimerDataToFromTimer() { List anonymousTimers = ImmutableList.of( TimerData.of( - namespace, - timestamp, - timestamp, - timeDomain, - TimerData.CausedByDrain.NORMAL), + namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL), TimerData.of( namespace, timestamp, timestamp.minus(Duration.millis(1)), timeDomain, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); for (TimerData timer : anonymousTimers) { Instant expectedTimestamp = timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) @@ -178,7 +175,7 @@ public void testTimerDataToFromTimer() { timestamp, expectedTimestamp, timer.getDomain(), - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); assertThat(computed, equalTo(expected)); } @@ -192,7 +189,7 @@ public void testTimerDataToFromTimer() { timestamp, timestamp, timeDomain, - TimerData.CausedByDrain.NORMAL), + CausedByDrain.NORMAL), TimerData.of( timerId, "family", namespace, timestamp, timestamp, timeDomain), TimerData.of( @@ -201,7 +198,7 @@ public void testTimerDataToFromTimer() { timestamp, timestamp.minus(Duration.millis(1)), timeDomain, - TimerData.CausedByDrain.NORMAL), + CausedByDrain.NORMAL), TimerData.of( timerId, "family", diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java index d06b4e9ccbad..1284c46a99ab 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; @@ -325,7 +326,7 @@ public void testTimerTag() { new Instant(123), new Instant(456), timeDomain, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); assertEquals(expectedBytes, WindmillTagEncodingV2.instance().timerTag(prefix, timerData)); } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java index bc4eb94758cc..970aea673304 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java @@ -29,6 +29,7 @@ import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; @@ -118,7 +119,7 @@ private void scheduleNextBundleCheck() { nextBundleCheckTime, nextBundleCheckTime, TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); bundleTimerScheduler.schedule( new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java index 8391049e7b14..26cc73c76f91 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.samza.operators.Scheduler; import org.joda.time.Duration; import org.joda.time.Instant; @@ -105,7 +106,7 @@ private void scheduleNextBundleCheck() { nextBundleCheckTime, nextBundleCheckTime, TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); bundleTimerScheduler.schedule( new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java index 9b19471fe7a7..1a2f82b1a0d7 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Instant; @@ -47,7 +48,7 @@ public void testCoder() throws Exception { TIMESTAMP, TIMESTAMP, TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); final String key = "timer-key"; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java index e9c90bbbf464..c7998a281f7c 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.samza.config.MapConfig; @@ -156,7 +157,7 @@ public void testEventTimeTimers() { new Instant(10), new Instant(10), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = @@ -166,7 +167,7 @@ public void testEventTimeTimers() { new Instant(100), new Instant(100), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer2); timerInternalsFactory.setInputWatermark(new Instant(5)); @@ -205,7 +206,7 @@ public void testRestoreEventBufferSize() throws Exception { new Instant(10), new Instant(10), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); store.close(); @@ -231,7 +232,7 @@ public void testRestoreEventBufferSize() throws Exception { new Instant(200), new Instant(200), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); restoredTimerInternals.setTimer(timer2); // Timer 2 should be added to the Event buffer @@ -272,7 +273,7 @@ public void testRestore() throws Exception { new Instant(10), new Instant(10), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = @@ -282,7 +283,7 @@ public void testRestore() throws Exception { new Instant(100), new Instant(100), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer2); store.close(); @@ -328,7 +329,7 @@ public void testProcessingTimeTimers() throws IOException { new Instant(10), new Instant(10), TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = @@ -338,7 +339,7 @@ public void testProcessingTimeTimers() throws IOException { new Instant(100), new Instant(100), TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer2); final TimerInternals.TimerData timer3 = @@ -389,7 +390,7 @@ public void testOverride() { new Instant(10), new Instant(10), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); // this timer should override the first timer @@ -400,7 +401,7 @@ public void testOverride() { new Instant(100), new Instant(100), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer2); final TimerInternals.TimerData timer3 = @@ -410,7 +411,7 @@ public void testOverride() { new Instant(200), new Instant(200), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer3); // this timer shouldn't override since it has a different id @@ -470,7 +471,7 @@ private void testMaxExpiredEventTimersProcessAtOnce( new Instant(i), new Instant(i), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java index 0be36d67388c..4bc01bd205a9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; @@ -116,8 +117,8 @@ public PaneInfo getPaneInfo() { } @Override - public boolean causedByDrain() { - return false; + public CausedByDrain causedByDrain() { + return CausedByDrain.NORMAL; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/CausedByDrain.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/CausedByDrain.java new file mode 100644 index 000000000000..809f34b365b0 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/CausedByDrain.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +public enum CausedByDrain { + CAUSED_BY_DRAIN, + NORMAL +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java index 03e3088e5256..67473e567d63 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java @@ -48,7 +48,7 @@ public interface OutputBuilder extends WindowedValue { OutputBuilder setRecordOffset(@Nullable Long recordOffset); - OutputBuilder setCausedByDrain(boolean causedByDrain); + OutputBuilder setCausedByDrain(CausedByDrain causedByDrain); void output(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java index bcd58b903171..daebeb31a39c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java @@ -52,7 +52,7 @@ public interface WindowedValue { @Nullable Long getRecordOffset(); - boolean causedByDrain(); + CausedByDrain causedByDrain(); /** * A representation of each of the actual values represented by this compressed {@link diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index b194207000ed..80331afd20be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -99,7 +99,7 @@ public static class Builder implements OutputBuilder { private @MonotonicNonNull Collection windows; private @Nullable String recordId; private @Nullable Long recordOffset; - private boolean causedByDrain; + private CausedByDrain causedByDrain = CausedByDrain.NORMAL; @Override public Builder setValue(T value) { @@ -144,7 +144,7 @@ public Builder setRecordOffset(@Nullable Long recordOffset) { } @Override - public Builder setCausedByDrain(boolean causedByDrain) { + public Builder setCausedByDrain(CausedByDrain causedByDrain) { this.causedByDrain = causedByDrain; return this; } @@ -198,7 +198,7 @@ public PaneInfo getPaneInfo() { } @Override - public boolean causedByDrain() { + public CausedByDrain causedByDrain() { return causedByDrain; } @@ -255,7 +255,7 @@ public String toString() { public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { - return of(value, timestamp, windows, paneInfo, null, null, false); + return of(value, timestamp, windows, paneInfo, null, null, CausedByDrain.NORMAL); } /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ @@ -266,7 +266,7 @@ public static WindowedValue of( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); @@ -285,7 +285,7 @@ static WindowedValue createWithoutValidation( Instant timestamp, Collection windows, PaneInfo paneInfo, - boolean causedByDrain) { + CausedByDrain causedByDrain) { if (windows.size() == 1) { return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); } else { @@ -299,12 +299,16 @@ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); - return of(value, timestamp, window, paneInfo, false); + return of(value, timestamp, window, paneInfo, CausedByDrain.NORMAL); } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( - T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo, boolean causedByDrain) { + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); @@ -324,7 +328,7 @@ public static WindowedValue of( * default timestamp and pane. */ public static WindowedValue valueInGlobalWindow(T value) { - return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null, false); + return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL); } /** @@ -332,7 +336,7 @@ public static WindowedValue valueInGlobalWindow(T value) { * default timestamp and the specified pane. */ public static WindowedValue valueInGlobalWindow(T value, PaneInfo paneInfo) { - return new ValueInGlobalWindow<>(value, paneInfo, null, null, false); + return new ValueInGlobalWindow<>(value, paneInfo, null, null, CausedByDrain.NORMAL); } /** @@ -344,7 +348,7 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta return valueInGlobalWindow(value); } else { return new TimestampedValueInGlobalWindow<>( - value, timestamp, PaneInfo.NO_FIRING, null, null, false); + value, timestamp, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL); } } @@ -357,7 +361,8 @@ public static WindowedValue timestampedValueInGlobalWindow( if (paneInfo.equals(PaneInfo.NO_FIRING)) { return timestampedValueInGlobalWindow(value, timestamp); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null, null, false); + return new TimestampedValueInGlobalWindow<>( + value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL); } } @@ -425,7 +430,7 @@ private abstract static class SimpleWindowedValue implements WindowedValue private final PaneInfo paneInfo; private final @Nullable String currentRecordId; private final @Nullable Long currentRecordOffset; - private final boolean causedByDrain; + private final CausedByDrain causedByDrain; @Override public @Nullable String getRecordId() { @@ -438,7 +443,7 @@ private abstract static class SimpleWindowedValue implements WindowedValue } @Override - public boolean causedByDrain() { + public CausedByDrain causedByDrain() { return causedByDrain; } @@ -447,7 +452,7 @@ protected SimpleWindowedValue( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { this.value = value; this.paneInfo = checkNotNull(paneInfo); this.currentRecordId = currentRecordId; @@ -501,7 +506,7 @@ public MinTimestampWindowedValue( PaneInfo pane, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, pane, currentRecordId, currentRecordOffset, causedByDrain); } @@ -520,7 +525,7 @@ public ValueInGlobalWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } @@ -576,7 +581,7 @@ public TimestampedWindowedValue( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); this.timestamp = checkNotNull(timestamp); } @@ -600,7 +605,7 @@ public TimestampedValueInGlobalWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } @@ -673,7 +678,7 @@ public TimestampedValueInSingleWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); this.window = checkNotNull(window); } @@ -745,7 +750,7 @@ public TimestampedValueInMultipleWindows( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); this.windows = checkNotNull(windows); } @@ -921,7 +926,7 @@ public void encode(WindowedValue windowedElem, OutputStream outStream, Contex BeamFnApi.Elements.ElementMetadata em = builder .setDrain( - windowedElem.causedByDrain() + windowedElem.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN ? BeamFnApi.Elements.DrainMode.Enum.DRAINING : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING) .build(); @@ -942,15 +947,14 @@ public WindowedValue decode(InputStream inStream, Context context) Instant timestamp = InstantCoder.of().decode(inStream); Collection windows = windowsCoder.decode(inStream); PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream); - boolean causedByDrain = false; + CausedByDrain causedByDrain = CausedByDrain.NORMAL; if (isMetadataSupported() && paneInfo.isElementMetadata()) { BeamFnApi.Elements.ElementMetadata elementMetadata = BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream)); - boolean b = elementMetadata.hasDrain(); causedByDrain = - b - ? elementMetadata.getDrain().equals(BeamFnApi.Elements.DrainMode.Enum.DRAINING) - : false; + elementMetadata.getDrain().equals(BeamFnApi.Elements.DrainMode.Enum.DRAINING) + ? CausedByDrain.CAUSED_BY_DRAIN + : CausedByDrain.NORMAL; } T value = valueCoder.decode(inStream, context); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 915399311859..97b99321e5c7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -92,7 +93,7 @@ public void testWindowedValueWithElementMetadataCoder() throws CoderException { PaneInfo.NO_FIRING, null, null, - true); // drain is persisted as part of metadata + CausedByDrain.CAUSED_BY_DRAIN); // drain is persisted as part of metadata Coder> windowedValueCoder = WindowedValues.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder()); @@ -104,7 +105,7 @@ public void testWindowedValueWithElementMetadataCoder() throws CoderException { Assert.assertEquals(value.getValue(), decodedValue.getValue()); Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp()); Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); - Assert.assertTrue(value.causedByDrain()); + Assert.assertEquals(CausedByDrain.CAUSED_BY_DRAIN, value.causedByDrain()); } @Test