Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.state.Timers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -136,22 +137,19 @@ public TimersImpl(StateNamespace namespace) {
@Override
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.setTimer(
TimerData.of(
namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL));
}

@Override
public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
timerInternals.setTimer(
TimerData.of(
namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, CausedByDrain.NORMAL));
}

@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.deleteTimer(
TimerData.of(
namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
TimerData.of(namespace, timestamp, timestamp, timeDomain, CausedByDrain.NORMAL));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.beam.sdk.util.construction.SplittableParDo;
import org.apache.beam.sdk.util.construction.SplittableParDo.ProcessKeyedElements;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
Expand Down Expand Up @@ -604,7 +605,7 @@ public String getErrorContext() {
wakeupTime,
wakeupTime,
TimeDomain.PROCESSING_TIME,
TimerInternals.TimerData.CausedByDrain.NORMAL));
CausedByDrain.NORMAL));
}

private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ComparisonChain;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
Expand Down Expand Up @@ -168,10 +169,6 @@ void deleteTimer(
/** Data about a timer as represented within {@link TimerInternals}. */
@AutoValue
abstract class TimerData implements Comparable<TimerData> {
public enum CausedByDrain {
CAUSED_BY_DRAIN,
NORMAL
}

public abstract String getTimerId();

Expand Down Expand Up @@ -245,7 +242,7 @@ public static TimerData of(
timestamp,
outputTimestamp,
domain,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);
}

/**
Expand Down Expand Up @@ -355,7 +352,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
timestamp,
outputTimestamp,
domain,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);
}

@Override
Expand Down Expand Up @@ -401,8 +398,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(
timerId, namespace, timestamp, timestamp, domain, TimerData.CausedByDrain.NORMAL);
return TimerData.of(timerId, namespace, timestamp, timestamp, domain, CausedByDrain.NORMAL);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.values.CausedByDrain;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
Expand All @@ -47,15 +48,15 @@ public void testFiringEventTimers() throws Exception {
new Instant(19),
new Instant(19),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);
TimerData eventTimer2 =
TimerData.of(
ID2,
NS1,
new Instant(29),
new Instant(29),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);

underTest.setTimer(eventTimer1);
underTest.setTimer(eventTimer2);
Expand Down Expand Up @@ -128,14 +129,14 @@ public void testFiringProcessingTimeTimers() throws Exception {
new Instant(19),
new Instant(19),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);
TimerData processingTime2 =
TimerData.of(
NS1,
new Instant(29),
new Instant(29),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);

underTest.setTimer(processingTime1);
underTest.setTimer(processingTime2);
Expand Down Expand Up @@ -165,46 +166,38 @@ public void testTimerOrdering() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTime1 =
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL);
TimerData processingTime1 =
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);
TimerData synchronizedProcessingTime1 =
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);
TimerData eventTime2 =
TimerData.of(
NS1,
new Instant(29),
new Instant(29),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL);
TimerData processingTime2 =
TimerData.of(
NS1,
new Instant(29),
new Instant(29),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);
TimerData synchronizedProcessingTime2 =
TimerData.of(
NS1,
new Instant(29),
new Instant(29),
TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);

underTest.setTimer(processingTime1);
underTest.setTimer(eventTime1);
Expand Down Expand Up @@ -239,18 +232,14 @@ public void testDeduplicate() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTime =
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, CausedByDrain.NORMAL);
TimerData processingTime =
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
CausedByDrain.NORMAL);
underTest.setTimer(eventTime);
underTest.setTimer(eventTime);
underTest.setTimer(processingTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand All @@ -49,7 +50,7 @@ public void testEncodeDecodeEqual() throws Exception {
new Instant(500L),
new Instant(500L),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL));
CausedByDrain.NORMAL));
Iterable<WindowedValue<Integer>> elements =
ImmutableList.of(
WindowedValues.valueInGlobalWindow(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValueReceiver;
import org.apache.beam.sdk.util.construction.TriggerTranslation;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowedValue;
Expand Down Expand Up @@ -578,7 +579,7 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc
timestamp,
timestamp,
domain,
TimerData.CausedByDrain.NORMAL));
CausedByDrain.NORMAL));
runner.onTimers(timers);
runner.persist();
}
Expand All @@ -593,7 +594,7 @@ public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws
timer.getTimestamp(),
timer.getTimestamp(),
timer.getValue(),
TimerData.CausedByDrain.NORMAL));
CausedByDrain.NORMAL));
}
runner.onTimers(timerData);
runner.persist();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
Expand Down Expand Up @@ -702,7 +703,7 @@ public void onTimer(OnTimerContext context) {
context.fireTimestamp(),
context.timestamp(),
context.timeDomain(),
TimerData.CausedByDrain.NORMAL));
CausedByDrain.NORMAL));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down Expand Up @@ -318,7 +319,7 @@ public void testOnTimerCalled() {
timestamp,
timestamp,
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL)));
CausedByDrain.NORMAL)));
}

private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
Expand Down Expand Up @@ -361,7 +362,7 @@ public <KeyT> void onTimer(
timestamp,
outputTimestamp,
timeDomain,
TimerData.CausedByDrain.NORMAL));
CausedByDrain.NORMAL));
}

@Override
Expand Down
Loading
Loading