From 91d3b6b1d397d30b3ee29f7cec0a3083bc4234ed Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 9 Feb 2026 13:53:28 +0100 Subject: [PATCH 1/2] move enum to top level class --- .../beam/runners/core/CausedByDrain.java | 23 +++++ .../runners/core/ReduceFnContextFactory.java | 9 +- .../SplittableParDoViaKeyedWorkItems.java | 2 +- .../beam/runners/core/TimerInternals.java | 11 +-- .../core/InMemoryTimerInternalsTest.java | 36 +++----- .../runners/core/KeyedWorkItemCoderTest.java | 2 +- .../beam/runners/core/ReduceFnTester.java | 4 +- .../runners/core/SimpleDoFnRunnerTest.java | 2 +- ...SimplePushbackSideInputDoFnRunnerTest.java | 4 +- .../beam/runners/core/TimerInternalsTest.java | 58 +++---------- .../triggers/TriggerStateMachineTester.java | 10 +-- .../direct/DirectTimerInternalsTest.java | 13 +-- ...ManagerRemovingTransformEvaluatorTest.java | 3 +- .../runners/direct/EvaluationContextTest.java | 3 +- .../runners/direct/WatermarkManagerTest.java | 85 +++++++------------ .../worker/WindmillTimerInternals.java | 5 +- .../StreamingGroupAlsoByWindowFnsTest.java | 3 +- ...gKeyedWorkItemSideInputDoFnRunnerTest.java | 3 +- .../StreamingModeExecutionContextTest.java | 3 +- .../worker/StreamingSideInputFetcherTest.java | 3 +- .../worker/UserParDoFnFactoryTest.java | 7 +- .../worker/WindmillKeyedWorkItemTest.java | 5 +- .../state/WindmillTagEncodingV1Test.java | 15 ++-- .../state/WindmillTagEncodingV2Test.java | 3 +- .../samza/runtime/ClassicBundleManager.java | 3 +- .../samza/runtime/PortableBundleManager.java | 3 +- .../samza/runtime/KeyedTimerDataTest.java | 3 +- .../SamzaTimerInternalsFactoryTest.java | 25 +++--- 28 files changed, 152 insertions(+), 194 deletions(-) create mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/CausedByDrain.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/CausedByDrain.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/CausedByDrain.java new file mode 100644 index 000000000000..5a6572eb9cab --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/CausedByDrain.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +public enum CausedByDrain { + CAUSED_BY_DRAIN, + NORMAL +} diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index f09c0cd6eff2..c3c986f899fc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -136,22 +136,19 @@ public TimersImpl(StateNamespace namespace) { @Override public void setTimer(Instant timestamp, TimeDomain timeDomain) { timerInternals.setTimer( - TimerData.of( - namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL)); } @Override public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { timerInternals.setTimer( - TimerData.of( - namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, CausedByDrain.NORMAL)); } @Override public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { timerInternals.deleteTimer( - TimerData.of( - namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL)); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 4cc96ee4ad54..100422e04292 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -604,7 +604,7 @@ public String getErrorContext() { wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } private DoFnInvoker.ArgumentProvider wrapOptionsAsSetup( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 92b9f92d0594..a5dc233c2e36 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -168,10 +168,6 @@ void deleteTimer( /** Data about a timer as represented within {@link TimerInternals}. */ @AutoValue abstract class TimerData implements Comparable { - public enum CausedByDrain { - CAUSED_BY_DRAIN, - NORMAL - } public abstract String getTimerId(); @@ -245,7 +241,7 @@ public static TimerData of( timestamp, outputTimestamp, domain, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); } /** @@ -355,7 +351,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException timestamp, outputTimestamp, domain, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); } @Override @@ -401,8 +397,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); Instant timestamp = INSTANT_CODER.decode(inStream); TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream)); - return TimerData.of( - timerId, namespace, timestamp, timestamp, domain, TimerData.CausedByDrain.NORMAL); + return TimerData.of(timerId, namespace, timestamp, timestamp, domain, CausedByDrain.NORMAL); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index 0444048b0798..287e37e73cf9 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -47,7 +47,7 @@ public void testFiringEventTimers() throws Exception { new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData eventTimer2 = TimerData.of( ID2, @@ -55,7 +55,7 @@ public void testFiringEventTimers() throws Exception { new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.setTimer(eventTimer1); underTest.setTimer(eventTimer2); @@ -128,14 +128,14 @@ public void testFiringProcessingTimeTimers() throws Exception { new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData processingTime2 = TimerData.of( NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.setTimer(processingTime1); underTest.setTimer(processingTime2); @@ -165,46 +165,38 @@ public void testTimerOrdering() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData eventTime1 = TimerData.of( - NS1, - new Instant(19), - new Instant(19), - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData processingTime1 = TimerData.of( NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData synchronizedProcessingTime1 = TimerData.of( NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData eventTime2 = TimerData.of( - NS1, - new Instant(29), - new Instant(29), - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData processingTime2 = TimerData.of( NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData synchronizedProcessingTime2 = TimerData.of( NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.setTimer(processingTime1); underTest.setTimer(eventTime1); @@ -239,18 +231,14 @@ public void testDeduplicate() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData eventTime = TimerData.of( - NS1, - new Instant(19), - new Instant(19), - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData processingTime = TimerData.of( NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.setTimer(eventTime); underTest.setTimer(eventTime); underTest.setTimer(processingTime); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java index e5355dd3cfe5..17d19e62367f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java @@ -49,7 +49,7 @@ public void testEncodeDecodeEqual() throws Exception { new Instant(500L), new Instant(500L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); Iterable> elements = ImmutableList.of( WindowedValues.valueInGlobalWindow(1), diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index ecaf3c9433cb..8a7047ed9847 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -578,7 +578,7 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc timestamp, timestamp, domain, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); runner.onTimers(timers); runner.persist(); } @@ -593,7 +593,7 @@ public void fireTimers(W window, TimestampedValue... timers) throws timer.getTimestamp(), timer.getTimestamp(), timer.getValue(), - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } runner.onTimers(timerData); runner.persist(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index e13b97ee62db..8497108adc49 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -702,7 +702,7 @@ public void onTimer(OnTimerContext context) { context.fireTimestamp(), context.timestamp(), context.timeDomain(), - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index fe95fcec2e8d..61d36088b1e8 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -318,7 +318,7 @@ public void testOnTimerCalled() { timestamp, timestamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL))); + CausedByDrain.NORMAL))); } private static class TestDoFnRunner implements DoFnRunner { @@ -361,7 +361,7 @@ public void onTimer( timestamp, outputTimestamp, timeDomain, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java index 0e0b8ba40982..7b9bc91eebfc 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java @@ -47,7 +47,7 @@ public void testTimerDataCoder() throws Exception { new Instant(0), new Instant(0), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); Coder windowCoder = IntervalWindow.getCoder(); CoderProperties.coderDecodeEncodeEqual( @@ -59,7 +59,7 @@ windowCoder, new IntervalWindow(new Instant(0), new Instant(100))), new Instant(99), new Instant(99), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } @Test @@ -73,12 +73,7 @@ public void testCompareEqual() { StateNamespace namespace = StateNamespaces.global(); TimerData timer = TimerData.of( - "id", - namespace, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + "id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); assertThat( timer, @@ -89,7 +84,7 @@ public void testCompareEqual() { timestamp, timestamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL))); + CausedByDrain.NORMAL))); } @Test @@ -100,18 +95,14 @@ public void testCompareByTimestamp() { TimerData firstTimer = TimerData.of( - namespace, - firstTimestamp, - firstTimestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData secondTimer = TimerData.of( namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); assertThat(firstTimer, lessThan(secondTimer)); } @@ -122,22 +113,17 @@ public void testCompareByDomain() { StateNamespace namespace = StateNamespaces.global(); TimerData eventTimer = - TimerData.of( - namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, TimerData.CausedByDrain.NORMAL); + TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData procTimer = TimerData.of( - namespace, - timestamp, - timestamp, - TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME, CausedByDrain.NORMAL); TimerData synchronizedProcTimer = TimerData.of( namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); assertThat(eventTimer, lessThan(procTimer)); assertThat(eventTimer, lessThan(synchronizedProcTimer)); @@ -156,18 +142,10 @@ public void testCompareByNamespace() { TimerData secondEventTime = TimerData.of( - firstWindowNs, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData thirdEventTime = TimerData.of( - secondWindowNs, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); assertThat(secondEventTime, lessThan(thirdEventTime)); } @@ -179,20 +157,10 @@ public void testCompareByTimerId() { TimerData id0Timer = TimerData.of( - "id0", - namespace, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + "id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerData id1Timer = TimerData.of( - "id1", - namespace, - timestamp, - timestamp, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + "id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); assertThat(id0Timer, lessThan(id1Timer)); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 4819efa73f96..af3389fc669b 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.beam.runners.core.ActiveWindowSet; import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.MergingActiveWindowSet; import org.apache.beam.runners.core.NonMergingActiveWindowSet; @@ -401,22 +402,19 @@ public TestTimers(StateNamespace namespace) { @Override public void setTimer(Instant timestamp, TimeDomain timeDomain) { timerInternals.setTimer( - TimerData.of( - namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL)); } @Override public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { timerInternals.setTimer( - TimerData.of( - namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, CausedByDrain.NORMAL)); } @Override public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { timerInternals.deleteTimer( - TimerData.of( - namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); + TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL)); } @Override diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java index cf9296139ddb..cd42e35bc8a3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.when; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -70,21 +71,21 @@ public void setTimerAddsToBuilder() { new Instant(20145L), new Instant(20145L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData processingTimer = TimerData.of( StateNamespaces.global(), new Instant(125555555L), new Instant(125555555L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData synchronizedProcessingTimer = TimerData.of( StateNamespaces.global(), new Instant(98745632189L), new Instant(98745632189L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); internals.setTimer(eventTimer); internals.setTimer(processingTimer); internals.setTimer(synchronizedProcessingTimer); @@ -102,21 +103,21 @@ public void deleteTimerDeletesOnBuilder() { new Instant(20145L), new Instant(20145L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData processingTimer = TimerData.of( StateNamespaces.global(), new Instant(125555555L), new Instant(125555555L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData synchronizedProcessingTimer = TimerData.of( StateNamespaces.global(), new Instant(98745632189L), new Instant(98745632189L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); internals.deleteTimer(eventTimer); internals.deleteTimer(processingTimer); internals.deleteTimer(synchronizedProcessingTimer); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index d0da3a293dbe..b3df0cfd107f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -111,7 +112,7 @@ public void removesOnExceptionInOnTimer() throws Exception { new Instant(0), new Instant(0), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL), + CausedByDrain.NORMAL), "", GlobalWindow.INSTANCE); } catch (Exception e) { diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 0427c2ed5401..05e5bfbfb070 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; @@ -338,7 +339,7 @@ public void extractFiredTimersExtractsTimers() { new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TransformResult timerResult = StepTransformResult.withoutHold(downstreamProducer) .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index a2e163acc648..628e6c720bb9 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark; @@ -1003,14 +1004,14 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { new Instant(250L), new Instant(250L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData futureTimer = TimerData.of( StateNamespaces.global(), new Instant(4096L), new Instant(4096L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdate timers = TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build(); manager.updateWatermarks( impulseBundle, @@ -1150,7 +1151,7 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( upstreamHold, upstreamHold, TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); manager.updateWatermarks( impulseBundle, TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of())) @@ -1246,21 +1247,21 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { new Instant(1000), new Instant(1000), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1337,21 +1338,21 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { new Instant(999L), new Instant(999L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(-12L, VarLongCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1428,21 +1429,21 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { new Instant(999L), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1510,7 +1511,7 @@ public void processingTimeTimersCanBeReset() { new Instant(5000L), new Instant(5000L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData overridingTimer = TimerData.of( @@ -1519,7 +1520,7 @@ public void processingTimeTimersCanBeReset() { new Instant(10000L), new Instant(10000L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build(); TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build(); @@ -1568,7 +1569,7 @@ public void eventTimeTimersCanBeReset() { new Instant(1000L), new Instant(1000L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData overridingTimer = TimerData.of( timerId, @@ -1576,7 +1577,7 @@ public void eventTimeTimersCanBeReset() { new Instant(2000L), new Instant(2000L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build(); TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build(); @@ -1640,7 +1641,7 @@ public void inputWatermarkDuplicates() { new Instant(100), new Instant(100), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData timer2 = TimerData.of( "a", @@ -1648,7 +1649,7 @@ public void inputWatermarkDuplicates() { new Instant(200), new Instant(200), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build()); // Only the last timer update should be observable @@ -1684,28 +1685,28 @@ public void timerUpdateBuilderBuildAddsAllAddedTimers() { new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData deleted = TimerData.of( StateNamespaces.global(), new Instant(24L), new Instant(24L), TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData completedOne = TimerData.of( StateNamespaces.global(), new Instant(1024L), new Instant(1024L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerData completedTwo = TimerData.of( StateNamespaces.global(), new Instant(2048L), new Instant(2048L), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdate update = TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of())) @@ -1728,7 +1729,7 @@ public void timerUpdateBuilderWithSetAtEndOfTime() { timerStamp, timerStamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty()); thrown.expect(IllegalArgumentException.class); @@ -1745,7 +1746,7 @@ public void timerUpdateBuilderWithSetPastEndOfTime() { timerStamp, timerStamp, TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty()); thrown.expect(IllegalArgumentException.class); @@ -1759,11 +1760,7 @@ public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build(); @@ -1777,11 +1774,7 @@ public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build(); @@ -1795,11 +1788,7 @@ public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.setTimer(timer); @@ -1814,11 +1803,7 @@ public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.deletedTimer(timer); @@ -1833,11 +1818,7 @@ public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.withCompletedTimers(ImmutableList.of(timer)); @@ -1852,11 +1833,7 @@ public void timerUpdateWithCompletedTimersNotAddedToExisting() { Instant now = Instant.now(); TimerData timer = TimerData.of( - StateNamespaces.global(), - now, - now, - TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME, CausedByDrain.NORMAL); TimerUpdate built = builder.build(); assertThat(built.getCompletedTimers(), emptyIterable()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index f4eb42751f0d..1922a1137a93 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.function.Consumer; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; @@ -108,7 +109,7 @@ public void setTimer( timestamp, outputTimestamp, timeDomain, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); setTimer(timer); } @@ -146,7 +147,7 @@ public void deleteTimer( BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE, timeDomain, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 520e9b3f5745..a74aea47ffb4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.List; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.InMemoryStateInternals; @@ -159,7 +160,7 @@ private void addTimer( type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL))) + CausedByDrain.NORMAL))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timestamp)) .setType(type) .setStateFamily(STATE_FAMILY); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java index d30c59b2709a..0f858535fd0d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.KeyedWorkItem; @@ -167,7 +168,7 @@ private TimerData timerData(IntervalWindow window, Instant timestamp, Timer.Type timestamp, timestamp, type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); } private IntervalWindow window(long start, long end) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index de85ee9ede5e..4ef441417c60 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.TimerInternals; @@ -177,7 +178,7 @@ public void testTimerInternalsSetTimer() { new Instant(5000), new Instant(5000), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); executionContext.flushState(); Windmill.Timer timer = outputBuilder.buildPartial().getOutputTimers(0); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java index 6400bb8b10d7..0ac23a774124 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; @@ -211,7 +212,7 @@ private TimerData createTimer(long timestamp) { new Instant(timestamp), new Instant(timestamp), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); } private IntervalWindow createWindow(long timestamp) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java index 00614c4b7397..b86492c49319 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java @@ -31,6 +31,7 @@ import java.util.Collections; import java.util.List; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.StateInternals; @@ -463,7 +464,7 @@ public void testCleanupTimerForGlobalWindowWithAllowedLateness() throws Exceptio BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)) + CausedByDrain.NORMAL)) .thenReturn(null); // Set up non-empty state. We don't mock + verify calls to clear() but instead @@ -539,7 +540,7 @@ public void testCleanupWorks() throws Exception { firstWindow.maxTimestamp().plus(Duration.millis(1L)), firstWindow.maxTimestamp().plus(Duration.millis(1L)), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)) + CausedByDrain.NORMAL)) .thenReturn(null); // This should fire the timer to clean up the first window @@ -556,7 +557,7 @@ public void testCleanupWorks() throws Exception { secondWindow.maxTimestamp().plus(Duration.millis(1L)), secondWindow.maxTimestamp().plus(Duration.millis(1L)), TimeDomain.EVENT_TIME, - TimerData.CausedByDrain.NORMAL)) + CausedByDrain.NORMAL)) .thenReturn(null); // And this should clean up the second window diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index 080be93a8955..32abfbe3a713 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Iterator; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -203,7 +204,7 @@ private Windmill.Timer makeSerializedTimer( new Instant(timestamp), new Instant(timestamp), timerTypeToTimeDomain(type), - TimerData.CausedByDrain.NORMAL))) + CausedByDrain.NORMAL))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp))) .setType(type) .setStateFamily(STATE_FAMILY) @@ -212,7 +213,7 @@ private Windmill.Timer makeSerializedTimer( private static TimerData makeTimer(StateNamespace ns, long timestamp, TimeDomain domain) { return TimerData.of( - ns, new Instant(timestamp), new Instant(timestamp), domain, TimerData.CausedByDrain.NORMAL); + ns, new Instant(timestamp), new Instant(timestamp), domain, CausedByDrain.NORMAL); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 9c4e1daaf3d0..1754df5c4a29 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.List; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateNamespaces; @@ -144,17 +145,13 @@ public void testTimerDataToFromTimer() { List anonymousTimers = ImmutableList.of( TimerData.of( - namespace, - timestamp, - timestamp, - timeDomain, - TimerData.CausedByDrain.NORMAL), + namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL), TimerData.of( namespace, timestamp, timestamp.minus(Duration.millis(1)), timeDomain, - TimerData.CausedByDrain.NORMAL)); + CausedByDrain.NORMAL)); for (TimerData timer : anonymousTimers) { Instant expectedTimestamp = timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) @@ -178,7 +175,7 @@ public void testTimerDataToFromTimer() { timestamp, expectedTimestamp, timer.getDomain(), - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); assertThat(computed, equalTo(expected)); } @@ -192,7 +189,7 @@ public void testTimerDataToFromTimer() { timestamp, timestamp, timeDomain, - TimerData.CausedByDrain.NORMAL), + CausedByDrain.NORMAL), TimerData.of( timerId, "family", namespace, timestamp, timestamp, timeDomain), TimerData.of( @@ -201,7 +198,7 @@ public void testTimerDataToFromTimer() { timestamp, timestamp.minus(Duration.millis(1)), timeDomain, - TimerData.CausedByDrain.NORMAL), + CausedByDrain.NORMAL), TimerData.of( timerId, "family", diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java index d06b4e9ccbad..cc99ac1c7741 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -29,6 +29,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; @@ -325,7 +326,7 @@ public void testTimerTag() { new Instant(123), new Instant(456), timeDomain, - TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); assertEquals(expectedBytes, WindmillTagEncodingV2.instance().timerTag(prefix, timerData)); } } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java index bc4eb94758cc..5607de9ba1d4 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import javax.annotation.Nullable; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.state.TimeDomain; @@ -118,7 +119,7 @@ private void scheduleNextBundleCheck() { nextBundleCheckTime, nextBundleCheckTime, TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); bundleTimerScheduler.schedule( new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java index 8391049e7b14..4fe1c81c7df3 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.state.TimeDomain; @@ -105,7 +106,7 @@ private void scheduleNextBundleCheck() { nextBundleCheckTime, nextBundleCheckTime, TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); bundleTimerScheduler.schedule( new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java index 9b19471fe7a7..ee0e837aec75 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.samza.runtime; import java.io.ByteArrayOutputStream; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; @@ -47,7 +48,7 @@ public void testCoder() throws Exception { TIMESTAMP, TIMESTAMP, TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); final String key = "timer-key"; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java index e9c90bbbf464..a2ef8eeed84a 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; @@ -156,7 +157,7 @@ public void testEventTimeTimers() { new Instant(10), new Instant(10), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = @@ -166,7 +167,7 @@ public void testEventTimeTimers() { new Instant(100), new Instant(100), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer2); timerInternalsFactory.setInputWatermark(new Instant(5)); @@ -205,7 +206,7 @@ public void testRestoreEventBufferSize() throws Exception { new Instant(10), new Instant(10), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); store.close(); @@ -231,7 +232,7 @@ public void testRestoreEventBufferSize() throws Exception { new Instant(200), new Instant(200), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); restoredTimerInternals.setTimer(timer2); // Timer 2 should be added to the Event buffer @@ -272,7 +273,7 @@ public void testRestore() throws Exception { new Instant(10), new Instant(10), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = @@ -282,7 +283,7 @@ public void testRestore() throws Exception { new Instant(100), new Instant(100), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer2); store.close(); @@ -328,7 +329,7 @@ public void testProcessingTimeTimers() throws IOException { new Instant(10), new Instant(10), TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = @@ -338,7 +339,7 @@ public void testProcessingTimeTimers() throws IOException { new Instant(100), new Instant(100), TimeDomain.PROCESSING_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer2); final TimerInternals.TimerData timer3 = @@ -389,7 +390,7 @@ public void testOverride() { new Instant(10), new Instant(10), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer1); // this timer should override the first timer @@ -400,7 +401,7 @@ public void testOverride() { new Instant(100), new Instant(100), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer2); final TimerInternals.TimerData timer3 = @@ -410,7 +411,7 @@ public void testOverride() { new Instant(200), new Instant(200), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer3); // this timer shouldn't override since it has a different id @@ -470,7 +471,7 @@ private void testMaxExpiredEventTimersProcessAtOnce( new Instant(i), new Instant(i), TimeDomain.EVENT_TIME, - TimerInternals.TimerData.CausedByDrain.NORMAL); + CausedByDrain.NORMAL); timerInternals.setTimer(timer); } From 4a2afe85f9f00d5827a520f3bd6a5ad7e393ecce Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 9 Feb 2026 14:14:08 +0100 Subject: [PATCH 2/2] move enum to sdk core changed boolean causedByDrain in WindowedValue interface to enum. spotless --- .../runners/core/ReduceFnContextFactory.java | 1 + .../SplittableParDoViaKeyedWorkItems.java | 1 + .../beam/runners/core/TimerInternals.java | 1 + .../core/InMemoryTimerInternalsTest.java | 1 + .../runners/core/KeyedWorkItemCoderTest.java | 1 + .../beam/runners/core/ReduceFnTester.java | 1 + .../runners/core/SimpleDoFnRunnerTest.java | 1 + ...SimplePushbackSideInputDoFnRunnerTest.java | 1 + .../beam/runners/core/TimerInternalsTest.java | 1 + .../triggers/TriggerStateMachineTester.java | 2 +- .../direct/DirectTimerInternalsTest.java | 2 +- ...ManagerRemovingTransformEvaluatorTest.java | 2 +- .../runners/direct/EvaluationContextTest.java | 2 +- .../runners/direct/WatermarkManagerTest.java | 2 +- .../runners/dataflow/BatchViewOverrides.java | 5 +- .../worker/UngroupedWindmillReader.java | 7 ++- .../worker/WindmillKeyedWorkItem.java | 7 ++- .../worker/WindmillTimerInternals.java | 2 +- .../worker/util/ValueInEmptyWindows.java | 5 +- .../StreamingGroupAlsoByWindowFnsTest.java | 2 +- ...gKeyedWorkItemSideInputDoFnRunnerTest.java | 2 +- .../StreamingModeExecutionContextTest.java | 2 +- .../worker/StreamingSideInputFetcherTest.java | 2 +- .../worker/UserParDoFnFactoryTest.java | 2 +- .../worker/WindmillKeyedWorkItemTest.java | 6 +- .../state/WindmillTagEncodingV1Test.java | 2 +- .../state/WindmillTagEncodingV2Test.java | 2 +- .../samza/runtime/ClassicBundleManager.java | 2 +- .../samza/runtime/PortableBundleManager.java | 2 +- .../samza/runtime/KeyedTimerDataTest.java | 2 +- .../SamzaTimerInternalsFactoryTest.java | 2 +- .../beam/runners/spark/util/TimerUtils.java | 5 +- .../beam/sdk/values}/CausedByDrain.java | 2 +- .../apache/beam/sdk/values/OutputBuilder.java | 2 +- .../apache/beam/sdk/values/WindowedValue.java | 2 +- .../beam/sdk/values/WindowedValues.java | 58 ++++++++++--------- .../beam/sdk/util/WindowedValueTest.java | 5 +- 37 files changed, 85 insertions(+), 62 deletions(-) rename {runners/core-java/src/main/java/org/apache/beam/runners/core => sdks/java/core/src/main/java/org/apache/beam/sdk/values}/CausedByDrain.java (95%) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index c3c986f899fc..fbf19dbe2613 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.state.Timers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 100422e04292..8f8c69b5e977 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.util.construction.SplittableParDo.ProcessKeyedElements; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index a5dc233c2e36..f19a5b1cf2e3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ComparisonChain; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index 287e37e73cf9..3cc7a248fda2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java index 17d19e62367f..56f6248d747a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 8a7047ed9847..43b6a3cb0cb0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -59,6 +59,7 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.TriggerTranslation; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.WindowedValue; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 8497108adc49..92385b1c8220 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index 61d36088b1e8..e32065f6ef56 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IdentitySideInputWindowFn; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java index 7b9bc91eebfc..78f8322b8da4 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index af3389fc669b..0f3f17c337ce 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -31,7 +31,6 @@ import java.util.Set; import org.apache.beam.runners.core.ActiveWindowSet; import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.MergingActiveWindowSet; import org.apache.beam.runners.core.NonMergingActiveWindowSet; @@ -49,6 +48,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java index cd42e35bc8a3..d8a68b8ccea6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.when; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; @@ -31,6 +30,7 @@ import org.apache.beam.runners.local.StructuralKey; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index b3df0cfd107f..53c02e796002 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -34,6 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.hamcrest.Matchers; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 05e5bfbfb070..3375ebfa36ce 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -32,7 +32,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; @@ -62,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 628e6c720bb9..1853344ee1ac 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -36,7 +36,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark; @@ -62,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index e7bb4dc9c0ac..d4cd1af386e3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -64,6 +64,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -1379,8 +1380,8 @@ public T getValue() { } @Override - public boolean causedByDrain() { - return false; + public CausedByDrain causedByDrain() { + return CausedByDrain.NORMAL; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index c248259a12de..625dc590d24b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; @@ -122,12 +123,14 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce * https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry induced by * drain happened upstream */ - boolean drainingValueFromUpstream = false; + CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL; if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { BeamFnApi.Elements.ElementMetadata elementMetadata = WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); drainingValueFromUpstream = - elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING; + elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING + ? CausedByDrain.CAUSED_BY_DRAIN + : CausedByDrain.NORMAL; } if (valueCoder instanceof KvCoder) { KvCoder kvCoder = (KvCoder) valueCoder; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index 1f99d929898c..da69f1a23718 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate; @@ -124,12 +125,14 @@ public Iterable> elementsIterable() { * https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry * induced by drain happened upstream */ - boolean drainingValueFromUpstream = false; + CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL; if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { BeamFnApi.Elements.ElementMetadata elementMetadata = WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); drainingValueFromUpstream = - elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING; + elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING + ? CausedByDrain.CAUSED_BY_DRAIN + : CausedByDrain.NORMAL; } InputStream inputStream = message.getData().newInput(); ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index 1922a1137a93..c62839333c7d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.function.Consumer; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; @@ -34,6 +33,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 00bb282c6845..2f7a5ce54fbf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -22,6 +22,7 @@ import java.util.Objects; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -60,8 +61,8 @@ public PaneInfo getPaneInfo() { } @Override - public boolean causedByDrain() { - return false; + public CausedByDrain causedByDrain() { + return CausedByDrain.NORMAL; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index a74aea47ffb4..35dc19dd7816 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -30,7 +30,6 @@ import java.util.Collection; import java.util.List; import org.apache.beam.model.fnexecution.v1.BeamFnApi; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.InMemoryStateInternals; @@ -76,6 +75,7 @@ import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.util.WindowedValueReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java index 0f858535fd0d..e12ddd95f913 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.List; import java.util.Set; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.KeyedWorkItem; @@ -51,6 +50,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 4ef441417c60..4bfa6efc8880 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -41,7 +41,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.TimerInternals; @@ -77,6 +76,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java index 0ac23a774124..06db68ccc324 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java @@ -26,7 +26,6 @@ import java.util.Arrays; import java.util.List; import java.util.Set; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; @@ -46,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java index b86492c49319..9d3fa9b211b1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java @@ -31,7 +31,6 @@ import java.util.Collections; import java.util.List; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.StateInternals; @@ -66,6 +65,7 @@ import org.apache.beam.sdk.util.DoFnInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.StringUtils; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValues; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index 32abfbe3a713..9062c881096f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.Iterator; import org.apache.beam.model.fnexecution.v1.BeamFnApi; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -44,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; @@ -266,8 +266,8 @@ public void testDrainPropagated() throws Exception { true); Iterator> iterator = keyedWorkItem.elementsIterable().iterator(); - Assert.assertTrue(iterator.next().causedByDrain()); - Assert.assertFalse(iterator.next().causedByDrain()); + Assert.assertEquals(CausedByDrain.CAUSED_BY_DRAIN, iterator.next().causedByDrain()); + Assert.assertEquals(CausedByDrain.NORMAL, iterator.next().causedByDrain()); // todo add assert for draining once timerdata is filled // (https://github.com/apache/beam/issues/36884) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 1754df5c4a29..48751c57754c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateNamespaces; @@ -41,6 +40,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java index cc99ac1c7741..1284c46a99ab 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -29,7 +29,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; @@ -48,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java index 5607de9ba1d4..970aea673304 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java @@ -25,11 +25,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import javax.annotation.Nullable; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java index 4fe1c81c7df3..26cc73c76f91 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java @@ -19,11 +19,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.samza.operators.Scheduler; import org.joda.time.Duration; import org.joda.time.Instant; diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java index ee0e837aec75..1a2f82b1a0d7 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.samza.runtime; import java.io.ByteArrayOutputStream; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; @@ -26,6 +25,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Instant; diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java index a2ef8eeed84a..c7998a281f7c 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java @@ -32,7 +32,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.CausedByDrain; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; @@ -44,6 +43,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.samza.config.MapConfig; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java index 0be36d67388c..4bc01bd205a9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; @@ -116,8 +117,8 @@ public PaneInfo getPaneInfo() { } @Override - public boolean causedByDrain() { - return false; + public CausedByDrain causedByDrain() { + return CausedByDrain.NORMAL; } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/CausedByDrain.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/CausedByDrain.java similarity index 95% rename from runners/core-java/src/main/java/org/apache/beam/runners/core/CausedByDrain.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/values/CausedByDrain.java index 5a6572eb9cab..809f34b365b0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/CausedByDrain.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/CausedByDrain.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.values; public enum CausedByDrain { CAUSED_BY_DRAIN, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java index 03e3088e5256..67473e567d63 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java @@ -48,7 +48,7 @@ public interface OutputBuilder extends WindowedValue { OutputBuilder setRecordOffset(@Nullable Long recordOffset); - OutputBuilder setCausedByDrain(boolean causedByDrain); + OutputBuilder setCausedByDrain(CausedByDrain causedByDrain); void output(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java index bcd58b903171..daebeb31a39c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java @@ -52,7 +52,7 @@ public interface WindowedValue { @Nullable Long getRecordOffset(); - boolean causedByDrain(); + CausedByDrain causedByDrain(); /** * A representation of each of the actual values represented by this compressed {@link diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index b194207000ed..80331afd20be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -99,7 +99,7 @@ public static class Builder implements OutputBuilder { private @MonotonicNonNull Collection windows; private @Nullable String recordId; private @Nullable Long recordOffset; - private boolean causedByDrain; + private CausedByDrain causedByDrain = CausedByDrain.NORMAL; @Override public Builder setValue(T value) { @@ -144,7 +144,7 @@ public Builder setRecordOffset(@Nullable Long recordOffset) { } @Override - public Builder setCausedByDrain(boolean causedByDrain) { + public Builder setCausedByDrain(CausedByDrain causedByDrain) { this.causedByDrain = causedByDrain; return this; } @@ -198,7 +198,7 @@ public PaneInfo getPaneInfo() { } @Override - public boolean causedByDrain() { + public CausedByDrain causedByDrain() { return causedByDrain; } @@ -255,7 +255,7 @@ public String toString() { public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { - return of(value, timestamp, windows, paneInfo, null, null, false); + return of(value, timestamp, windows, paneInfo, null, null, CausedByDrain.NORMAL); } /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ @@ -266,7 +266,7 @@ public static WindowedValue of( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); @@ -285,7 +285,7 @@ static WindowedValue createWithoutValidation( Instant timestamp, Collection windows, PaneInfo paneInfo, - boolean causedByDrain) { + CausedByDrain causedByDrain) { if (windows.size() == 1) { return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); } else { @@ -299,12 +299,16 @@ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); - return of(value, timestamp, window, paneInfo, false); + return of(value, timestamp, window, paneInfo, CausedByDrain.NORMAL); } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( - T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo, boolean causedByDrain) { + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); @@ -324,7 +328,7 @@ public static WindowedValue of( * default timestamp and pane. */ public static WindowedValue valueInGlobalWindow(T value) { - return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null, false); + return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL); } /** @@ -332,7 +336,7 @@ public static WindowedValue valueInGlobalWindow(T value) { * default timestamp and the specified pane. */ public static WindowedValue valueInGlobalWindow(T value, PaneInfo paneInfo) { - return new ValueInGlobalWindow<>(value, paneInfo, null, null, false); + return new ValueInGlobalWindow<>(value, paneInfo, null, null, CausedByDrain.NORMAL); } /** @@ -344,7 +348,7 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta return valueInGlobalWindow(value); } else { return new TimestampedValueInGlobalWindow<>( - value, timestamp, PaneInfo.NO_FIRING, null, null, false); + value, timestamp, PaneInfo.NO_FIRING, null, null, CausedByDrain.NORMAL); } } @@ -357,7 +361,8 @@ public static WindowedValue timestampedValueInGlobalWindow( if (paneInfo.equals(PaneInfo.NO_FIRING)) { return timestampedValueInGlobalWindow(value, timestamp); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null, null, false); + return new TimestampedValueInGlobalWindow<>( + value, timestamp, paneInfo, null, null, CausedByDrain.NORMAL); } } @@ -425,7 +430,7 @@ private abstract static class SimpleWindowedValue implements WindowedValue private final PaneInfo paneInfo; private final @Nullable String currentRecordId; private final @Nullable Long currentRecordOffset; - private final boolean causedByDrain; + private final CausedByDrain causedByDrain; @Override public @Nullable String getRecordId() { @@ -438,7 +443,7 @@ private abstract static class SimpleWindowedValue implements WindowedValue } @Override - public boolean causedByDrain() { + public CausedByDrain causedByDrain() { return causedByDrain; } @@ -447,7 +452,7 @@ protected SimpleWindowedValue( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { this.value = value; this.paneInfo = checkNotNull(paneInfo); this.currentRecordId = currentRecordId; @@ -501,7 +506,7 @@ public MinTimestampWindowedValue( PaneInfo pane, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, pane, currentRecordId, currentRecordOffset, causedByDrain); } @@ -520,7 +525,7 @@ public ValueInGlobalWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } @@ -576,7 +581,7 @@ public TimestampedWindowedValue( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); this.timestamp = checkNotNull(timestamp); } @@ -600,7 +605,7 @@ public TimestampedValueInGlobalWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } @@ -673,7 +678,7 @@ public TimestampedValueInSingleWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); this.window = checkNotNull(window); } @@ -745,7 +750,7 @@ public TimestampedValueInMultipleWindows( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + CausedByDrain causedByDrain) { super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); this.windows = checkNotNull(windows); } @@ -921,7 +926,7 @@ public void encode(WindowedValue windowedElem, OutputStream outStream, Contex BeamFnApi.Elements.ElementMetadata em = builder .setDrain( - windowedElem.causedByDrain() + windowedElem.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN ? BeamFnApi.Elements.DrainMode.Enum.DRAINING : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING) .build(); @@ -942,15 +947,14 @@ public WindowedValue decode(InputStream inStream, Context context) Instant timestamp = InstantCoder.of().decode(inStream); Collection windows = windowsCoder.decode(inStream); PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream); - boolean causedByDrain = false; + CausedByDrain causedByDrain = CausedByDrain.NORMAL; if (isMetadataSupported() && paneInfo.isElementMetadata()) { BeamFnApi.Elements.ElementMetadata elementMetadata = BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream)); - boolean b = elementMetadata.hasDrain(); causedByDrain = - b - ? elementMetadata.getDrain().equals(BeamFnApi.Elements.DrainMode.Enum.DRAINING) - : false; + elementMetadata.getDrain().equals(BeamFnApi.Elements.DrainMode.Enum.DRAINING) + ? CausedByDrain.CAUSED_BY_DRAIN + : CausedByDrain.NORMAL; } T value = valueCoder.decode(inStream, context); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 915399311859..97b99321e5c7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -92,7 +93,7 @@ public void testWindowedValueWithElementMetadataCoder() throws CoderException { PaneInfo.NO_FIRING, null, null, - true); // drain is persisted as part of metadata + CausedByDrain.CAUSED_BY_DRAIN); // drain is persisted as part of metadata Coder> windowedValueCoder = WindowedValues.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder()); @@ -104,7 +105,7 @@ public void testWindowedValueWithElementMetadataCoder() throws CoderException { Assert.assertEquals(value.getValue(), decodedValue.getValue()); Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp()); Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); - Assert.assertTrue(value.causedByDrain()); + Assert.assertEquals(CausedByDrain.CAUSED_BY_DRAIN, value.causedByDrain()); } @Test