diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java index a68ab6c913ce..e919d12eaaca 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java @@ -102,6 +102,10 @@ public W getWindow() { return window; } + public Coder getWindowCoder() { + return windowCoder; + } + @Override public String stringKey() { try { @@ -170,6 +174,10 @@ public W getWindow() { return window; } + public Coder getWindowCoder() { + return windowCoder; + } + public int getTriggerIndex() { return triggerIndex; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index ba5478be6c77..5d69abe8ffce 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -144,6 +144,8 @@ private StateTags() {} private interface SystemStateTag { StateTag asKind(StateKind kind); + + StateKind getKind(); } /** Create a state tag for the given id and spec. */ @@ -243,6 +245,16 @@ public static StateTag makeSystemTagInternal( return typedTag.asKind(StateKind.SYSTEM); } + /* + * Returns true if the tag is a system internal tag. + */ + public static boolean isSystemTagInternal(StateTag tag) { + if (!(tag instanceof SystemStateTag)) { + return false; + } + return StateKind.SYSTEM.equals(((SystemStateTag) tag).getKind()); + } + public static StateTag> convertToBagTagInternal( StateTag> combiningTag) { return new SimpleStateTag<>( @@ -358,6 +370,11 @@ public StateTag asKind(StateKind kind) { return new SimpleStateTag<>(id.asKind(kind), spec); } + @Override + public StateKind getKind() { + return id.kind; + } + @Override public boolean equals(@Nullable Object other) { if (!(other instanceof SimpleStateTag)) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index ec996c9ab02e..09afcadc3002 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -59,10 +59,12 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForComputation; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; @@ -154,13 +156,14 @@ public StreamingModeExecutionContext( String computationId, ReaderCache readerCache, Map stateNameMap, - WindmillStateCache.ForComputation stateCache, + ForComputation stateCache, MetricsContainerRegistry metricsContainerRegistry, DataflowExecutionStateTracker executionStateTracker, StreamingModeExecutionStateRegistry executionStateRegistry, StreamingGlobalConfigHandle globalConfigHandle, long sinkByteLimit, - boolean throwExceptionOnLargeOutput) { + boolean throwExceptionOnLargeOutput, + boolean enableWindmillTagEncodingV2) { super( counterFactory, metricsContainerRegistry, @@ -171,7 +174,10 @@ public StreamingModeExecutionContext( this.readerCache = readerCache; this.globalConfigHandle = globalConfigHandle; this.sideInputCache = new HashMap<>(); - this.windmillTagEncoding = WindmillTagEncodingV1.instance(); + this.windmillTagEncoding = + enableWindmillTagEncodingV2 + ? WindmillTagEncodingV2.instance() + : WindmillTagEncodingV1.instance(); this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index cb41aa1ccab4..4287188c35bb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Setting a timer, clear any prior hold and set to the new value outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true) .addTimestamps( @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Clear the hold in case a previous iteration of this timer set one. outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true); } @@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // We are deleting timer; clear the hold outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java index 59841f67347d..a979a1d982c4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java @@ -54,8 +54,11 @@ public abstract class WindmillTagEncoding { /** * Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark * hold that is only freed after the timer fires. + * + * @param timerTag tag of the timer that maps to the hold. */ - public abstract ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData); + public abstract ByteString timerHoldTag( + WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag); /** * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java index 19e31351a52b..08fb4c81df36 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java @@ -70,7 +70,8 @@ public InternedByteString stateTag(StateNamespace namespace, StateTag address /** {@inheritDoc} */ @Override - public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData) { + public ByteString timerHoldTag( + WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag) { String tagString; if ("".equals(timerData.getTimerFamilyId())) { tagString = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java new file mode 100644 index 000000000000..7d11a419ceef --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.io.InputStream; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; +import org.apache.beam.runners.core.StateNamespaces.WindowAndTriggerNamespace; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; +import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; +import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; +import org.joda.time.Instant; + +/** + * Encodes and decodes StateTags and TimerTags from and to windmill bytes. This encoding scheme + * enforces a specific lexicographical order on state tags. The ordering enables building range + * filters using the tags. + * + *

1. High-Level Tag Formats

+ * + *

State tags and Timer tags differ in structure but share common component encodings. + * + *

1.1 State Tag Encoding

+ * + *

Used for generic state variables (e.g., ValueState, BagState, etc). + * + *

+ * Format:
+ * | Encoded Namespace | Encoded Address |
+ * 
+ * + *
    + *
  • Encoded Namespace: Encodes the state namespace (see Section 2.1). + *
  • Encoded Address: Encodes the state variable address (see Section 2.3). + *
+ * + *

1.2 Timer/Timer Hold Tag Encoding

+ * + *

Specialized tags, used for timers and automatic watermark holds associated with the timers. + * + *

+ * Format:
+ * | Encoded Namespace | Tag Type | Timer Family Id | Timer Id |
+ *
+ * +-------------------+-----------------------------------------------------------+
+ * | Field             | Format                                                    |
+ * +-------------------+-----------------------------------------------------------+
+ * | Encoded Namespace | Encoded namespace (see Section 2.1).                      |
+ * +-------------------+-----------------------------------------------------------+
+ * | Tag Type          | {@code 0x03} (Single byte): System Timer/Watermark Hold   |
+ * |                   | {@code 0x04} (Single byte): User Timer/Watermark Hold     |
+ * +-------------------+-----------------------------------------------------------+
+ * | Timer Family ID   | TimerFamilyId encoded via length prefixed                 |
+ * |                   | {@code StringUtf8Coder}.                                  |
+ * +-------------------+-----------------------------------------------------------+
+ * | Timer ID          | TimerId encoded via length prefixed                       |
+ * |                   | {@code StringUtf8Coder}.                                  |
+ * +-------------------+-----------------------------------------------------------+
+ * 
+ * + *

2. Component Encodings

+ * + *

2.1 Namespace Encoding

+ * + *

Namespaces are prefixed with a byte ID to control sorting order. + * + *

+ * +---------------------------+-------------------------------------------------------------+
+ * | Namespace Type            | Format                                                      |
+ * +---------------------------+-------------------------------------------------------------+
+ * | GlobalNamespace           | | {@code 0x01} |                                            |
+ * |                           | (Single byte)                                               |
+ * +---------------------------+-------------------------------------------------------------+
+ * | WindowNamespace           | | {@code 0x10} | Encoded Window | {@code 0x01} |            |
+ * |                           | (See Section 2.2)                                           |
+ * +---------------------------+-------------------------------------------------------------+
+ * | WindowAndTriggerNamespace | | {@code 0x10} | Encoded Window | {@code 0x02} | TriggerIndex |
+ * |                           | (See Section 2.2 for Encoded Window)                        |
+ * |                           | TriggerIndex is encoded by {@code BigEndianIntegerCoder}    |
+ * +---------------------------+-------------------------------------------------------------+
+ * 
+ * + *

2.2 Window Encoding

+ * + *

2.2.1 IntervalWindow

+ * + *

IntervalWindows use a custom encoding that is different from the IntervalWindowCoder. + * + *

+ * Format:
+ * | 0x64 | End Time | Start Time |
+ * 
+ * + *
    + *
  • Prefix: {@code 0x64}. Single byte identifying Interval windows. + *
  • End Time: {@code intervalWindow.end()} encoded via {@code InstantCoder}. + *
  • Start Time: {@code intervalWindow.start()} encoded via {@code InstantCoder}. + *
+ * + *

Note: {@code InstantCoder} preserves the sort order. The encoded IntervalWindow is to + * be sorted based on {@code [End Time, Start Time]} directly without needing to decode. + * + *

2.2.2 Other Windows

+ * + *

All non-IntervalWindows use the standard window coders. + * + *

+ * Format:
+ * | 0x02 | Window |
+ * 
+ * + *
    + *
  • Prefix: {@code 0x02}. Single byte identifying non-Interval windows. + *
  • Window: The window serialized using its {@code windowCoder}. + *
+ * + *

2.3 Address Encoding

+ * + *

Combines the state type and the state identifier. + * + *

+ * Format:
+ * | State Type | Address |
+ *
+ * +------------+-----------------------------------------------------------------+
+ * | Field      | Format                                                          |
+ * +------------+-----------------------------------------------------------------+
+ * | State Type | {@code 0x01} (Single byte): System State                        |
+ * |            | {@code 0x02} (Single byte): User State                          |
+ * +------------+-----------------------------------------------------------------+
+ * | Address    | The state address (string) is encoded via length prefixed       |
+ * |            | {@code StringUtf8Coder}.                                        |
+ * +------------+-----------------------------------------------------------------+
+ * 
+ * + *

3. Tag Ordering

+ * + *

The encoding prefixes are chosen to enforce the following lexicographical sort order (lowest + * to highest): + * + *

    + *
  1. Tags in Global Namespace (Prefix {@code 0x01}) + *
  2. Tags in Non-Interval Windows (Prefix {@code 0x1002}) + *
  3. Tags in Interval Windows (Prefix {@code 0x1064}) + *
      + *
    • Sorted internally by {@code [EndTime, StartTime]}. + *
    + *
+ */ +@Internal +@ThreadSafe +public class WindmillTagEncodingV2 extends WindmillTagEncoding { + + private static final WindmillTagEncodingV2 INSTANCE = new WindmillTagEncodingV2(); + private static final int WINDOW_NAMESPACE_BYTE = 0x01; + private static final int WINDOW_AND_TRIGGER_NAMESPACE_BYTE = 0x02; + private static final int NON_GLOBAL_NAMESPACE_BYTE = 0x10; + private static final int GLOBAL_NAMESPACE_BYTE = 0x01; + private static final int SYSTEM_STATE_TAG_BYTE = 0x01; + private static final int USER_STATE_TAG_BYTE = 0x02; + private static final int SYSTEM_TIMER_BYTE = 0x03; + private static final int USER_TIMER_BYTE = 0x04; + private static final int INTERVAL_WINDOW_BYTE = 0x64; + private static final int OTHER_WINDOW_BYTE = 0x02; + + // Private constructor to prevent instantiations from outside. + private WindmillTagEncodingV2() {} + + /** {@inheritDoc} */ + @Override + public InternedByteString stateTag(StateNamespace namespace, StateTag address) { + try (StreamHandle streamHandle = ThreadLocalByteStringOutputStream.acquire()) { + ByteStringOutputStream stream = streamHandle.stream(); + encodeNameSpace(namespace, stream); + encodeAddress(address, stream); + return InternedByteString.of(stream.toByteStringAndReset()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override + public ByteString timerHoldTag( + WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag) { + // Same encoding for timer tag and timer hold tag. + // They are put in different places and won't collide. + return timerTag; + } + + /** {@inheritDoc} */ + @Override + public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { + try (StreamHandle streamHandle = ThreadLocalByteStringOutputStream.acquire()) { + ByteStringOutputStream stream = streamHandle.stream(); + encodeNameSpace(timerData.getNamespace(), stream); + if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) { + stream.write(SYSTEM_TIMER_BYTE); + } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)) { + stream.write(USER_TIMER_BYTE); + } else { + throw new IllegalStateException("Unexpected WindmillNamespacePrefix" + prefix); + } + StringUtf8Coder.of().encode(timerData.getTimerFamilyId(), stream); + StringUtf8Coder.of().encode(timerData.getTimerId(), stream); + return stream.toByteStringAndReset(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override + public TimerData windmillTimerToTimerData( + WindmillNamespacePrefix prefix, + Timer timer, + Coder windowCoder, + boolean draining) { + + InputStream stream = timer.getTag().newInput(); + + try { + StateNamespace stateNamespace = decodeNameSpace(stream, windowCoder); + int nextByte = stream.read(); + if (nextByte == SYSTEM_TIMER_BYTE) { + checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)); + } else if (nextByte == USER_TIMER_BYTE) { + checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)); + } else { + throw new IllegalStateException("Unexpected timer tag byte: " + nextByte); + } + + String timerFamilyId = StringUtf8Coder.of().decode(stream); + String timerId = StringUtf8Coder.of().decode(stream); + + Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp()); + Instant outputTimestamp = timestamp; + if (timer.hasMetadataTimestamp()) { + // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure + // to change the upper bound. + outputTimestamp = + WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp()); + if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) { + outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE; + } + } + + return TimerData.of( + timerId, + timerFamilyId, + stateNamespace, + timestamp, + outputTimestamp, + timerTypeToTimeDomain(timer.getType())); + + } catch (IOException e) { + throw new RuntimeException(e); + } + // todo add draining (https://github.com/apache/beam/issues/36884) + } + + /** @return the singleton WindmillStateTagUtil */ + public static WindmillTagEncodingV2 instance() { + return INSTANCE; + } + + private void encodeAddress(StateTag tag, ByteStringOutputStream stream) throws IOException { + if (StateTags.isSystemTagInternal(tag)) { + stream.write(SYSTEM_STATE_TAG_BYTE); + } else { + stream.write(USER_STATE_TAG_BYTE); + } + StringUtf8Coder.of().encode(tag.getId(), stream); + } + + private void encodeNameSpace(StateNamespace namespace, ByteStringOutputStream stream) + throws IOException { + if (namespace instanceof GlobalNamespace) { + stream.write(GLOBAL_NAMESPACE_BYTE); + } else if (namespace instanceof WindowNamespace) { + stream.write(NON_GLOBAL_NAMESPACE_BYTE); + encodeWindowNamespace((WindowNamespace) namespace, stream); + } else if (namespace instanceof WindowAndTriggerNamespace) { + stream.write(NON_GLOBAL_NAMESPACE_BYTE); + encodeWindowAndTriggerNamespace( + (WindowAndTriggerNamespace) namespace, stream); + } else { + throw new IllegalStateException("Unsupported namespace type: " + namespace.getClass()); + } + } + + private StateNamespace decodeNameSpace( + InputStream stream, Coder windowCoder) throws IOException { + int firstByte = stream.read(); + switch (firstByte) { + case GLOBAL_NAMESPACE_BYTE: + return StateNamespaces.global(); + case NON_GLOBAL_NAMESPACE_BYTE: + return decodeNonGlobalNamespace(stream, windowCoder); + default: + throw new IllegalStateException("Invalid first namespace byte: " + firstByte); + } + } + + private StateNamespace decodeNonGlobalNamespace( + InputStream stream, Coder windowCoder) throws IOException { + W window = decodeWindow(stream, windowCoder); + int namespaceByte = stream.read(); + switch (namespaceByte) { + case WINDOW_NAMESPACE_BYTE: + return StateNamespaces.window(windowCoder, window); + case WINDOW_AND_TRIGGER_NAMESPACE_BYTE: + Integer triggerIndex = BigEndianIntegerCoder.of().decode(stream); + return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); + default: + throw new IllegalStateException("Invalid trigger namespace byte: " + namespaceByte); + } + } + + private W decodeWindow(InputStream stream, Coder windowCoder) + throws IOException { + int firstByte = stream.read(); + W window; + switch (firstByte) { + case INTERVAL_WINDOW_BYTE: + window = (W) decodeIntervalWindow(stream); + break; + case OTHER_WINDOW_BYTE: + window = windowCoder.decode(stream); + break; + default: + throw new IllegalStateException("Unexpected window first byte: " + firstByte); + } + return window; + } + + private IntervalWindow decodeIntervalWindow(InputStream stream) throws IOException { + Instant end = InstantCoder.of().decode(stream); + Instant start = InstantCoder.of().decode(stream); + return new IntervalWindow(start, end); + } + + private void encodeWindowNamespace( + WindowNamespace windowNamespace, ByteStringOutputStream stream) throws IOException { + encodeWindow(windowNamespace.getWindow(), windowNamespace.getWindowCoder(), stream); + stream.write(WINDOW_NAMESPACE_BYTE); + } + + private void encodeWindowAndTriggerNamespace( + WindowAndTriggerNamespace windowAndTriggerNamespace, ByteStringOutputStream stream) + throws IOException { + encodeWindow( + windowAndTriggerNamespace.getWindow(), windowAndTriggerNamespace.getWindowCoder(), stream); + stream.write(WINDOW_AND_TRIGGER_NAMESPACE_BYTE); + BigEndianIntegerCoder.of().encode(windowAndTriggerNamespace.getTriggerIndex(), stream); + } + + private void encodeWindow( + W window, Coder windowCoder, ByteStringOutputStream stream) throws IOException { + if (windowCoder instanceof IntervalWindowCoder) { + stream.write(INTERVAL_WINDOW_BYTE); + InstantCoder.of().encode(((IntervalWindow) window).end(), stream); + InstantCoder.of().encode(((IntervalWindow) window).start(), stream); + } else { + stream.write(OTHER_WINDOW_BYTE); + windowCoder.encode(window, stream); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java index 269799903300..097da87fb015 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java @@ -74,6 +74,14 @@ final class ComputationWorkExecutorFactory { private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT = "throw_exceptions_on_large_output"; + // Experiment to enable tag encoding v2. + // Experiment is for testing by dataflow runner developers. + // Related logic could change anytime without notice. + // **DO NOT USE** on real workloads. + // Enabling the experiment could lead to state incompatibilities and broken jobs. + private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT = + "unstable_windmill_tag_encoding_v2"; + private final DataflowWorkerHarnessOptions options; private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory; private final ReaderCache readerCache; @@ -97,6 +105,7 @@ final class ComputationWorkExecutorFactory { private final IdGenerator idGenerator; private final StreamingGlobalConfigHandle globalConfigHandle; private final boolean throwExceptionOnLargeOutput; + private final boolean enableWindmillTagEncodingV2; ComputationWorkExecutorFactory( DataflowWorkerHarnessOptions options, @@ -124,6 +133,8 @@ final class ComputationWorkExecutorFactory { : StreamingDataflowWorker.MAX_SINK_BYTES; this.throwExceptionOnLargeOutput = hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT); + this.enableWindmillTagEncodingV2 = + hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT); } private static Nodes.ParallelInstructionNode extractReadNode( @@ -268,7 +279,8 @@ private StreamingModeExecutionContext createExecutionContext( stageInfo.executionStateRegistry(), globalConfigHandle, maxSinkBytes, - throwExceptionOnLargeOutput); + throwExceptionOnLargeOutput, + enableWindmillTagEncodingV2); } private DataflowMapTaskExecutor createMapTaskExecutor( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 93b279f0aec5..8372b33d81c8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -133,7 +133,8 @@ public void setUp() { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false); + /*throwExceptionOnLargeOutput=*/ false, + /*enableWindmillTagEncodingV2=*/ false); } private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 334b9414b26b..f7364104f5db 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -618,7 +618,8 @@ public void testReadUnboundedReader() throws Exception { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false); + /*throwExceptionOnLargeOutput=*/ false, + /*enableWindmillTagEncodingV2=*/ false); options.setNumWorkers(5); int maxElements = 10; @@ -989,7 +990,8 @@ public void testFailedWorkItemsAbort() throws Exception { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false); + /*throwExceptionOnLargeOutput=*/ false, + /*enableWindmillTagEncodingV2=*/ false); options.setNumWorkers(5); int maxElements = 100; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java new file mode 100644 index 000000000000..af9ef95410d1 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -0,0 +1,576 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Enclosed.class) +public class WindmillTagEncodingV2Test { + + private static final IntervalWindow INTERVAL_WINDOW = + new IntervalWindow(new Instant(10), new Instant(20)); + + private static final CustomWindow CUSTOM_WINDOW = new CustomWindow(INTERVAL_WINDOW); + + private static final int TRIGGER_INDEX = 5; + + private static final StateNamespace GLOBAL_NAMESPACE = new GlobalNamespace(); + + private static final StateNamespace INTERVAL_WINDOW_NAMESPACE = + StateNamespaces.window(IntervalWindow.getCoder(), INTERVAL_WINDOW); + private static final StateNamespace INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE = + StateNamespaces.windowAndTrigger(IntervalWindow.getCoder(), INTERVAL_WINDOW, TRIGGER_INDEX); + + private static final StateNamespace OTHER_WINDOW_NAMESPACE = + StateNamespaces.window(new CustomWindow.CustomWindowCoder(), CUSTOM_WINDOW); + private static final StateNamespace OTHER_WINDOW_AND_TRIGGER_NAMESPACE = + StateNamespaces.windowAndTrigger( + new CustomWindow.CustomWindowCoder(), CUSTOM_WINDOW, TRIGGER_INDEX); + + // Generate a tag with length > 256, so length is encoded in two bytes. + private static final String TAG = + IntStream.of(300).mapToObj(i -> "a").collect(Collectors.joining()); + + private static final StateTag> USER_STATE_TAG = + StateTags.value(TAG, VarIntCoder.of()); + private static final StateTag> SYSTEM_STATE_TAG = + StateTags.makeSystemTagInternal(StateTags.value(TAG, VarIntCoder.of())); + + private static final ByteString TAG_BYTES = encode(StringUtf8Coder.of(), TAG); + + private static final ByteString SYSTEM_STATE_TAG_BYTES = + ByteString.copyFrom(new byte[] {1}) // system tag + .concat(TAG_BYTES); + private static final ByteString USER_STATE_TAG_BYTES = + ByteString.copyFrom(new byte[] {2}) // user tag + .concat(TAG_BYTES); + + private static final ByteString GLOBAL_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x1}); // global namespace + + private static final ByteString INTERVAL_WINDOW_BYTES = + ByteString.EMPTY + .concat(encode(InstantCoder.of(), INTERVAL_WINDOW.end())) + .concat(encode(InstantCoder.of(), INTERVAL_WINDOW.start())); + + private static final ByteString INTERVAL_WINDOW_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x10}) // non global namespace + .concat(ByteString.copyFrom(new byte[] {0x64})) // interval window + .concat(INTERVAL_WINDOW_BYTES) + .concat(ByteString.copyFrom(new byte[] {0x01})); // window namespace + + private static final ByteString INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x10}) // non global namespace + .concat(ByteString.copyFrom(new byte[] {0x64})) // interval window + .concat(INTERVAL_WINDOW_BYTES) + .concat(ByteString.copyFrom(new byte[] {0x02})) // window and trigger namespace + .concat( + ByteString.copyFrom(new byte[] {0x00, 0x00, 0x00, 0x05})); // big endian trigger index + + private static final ByteString OTHER_WINDOW_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x10}) // non global namespace + .concat(ByteString.copyFrom(new byte[] {0x02})) // non interval window + .concat(encode(new CustomWindow.CustomWindowCoder(), new CustomWindow(INTERVAL_WINDOW))) + .concat(ByteString.copyFrom(new byte[] {0x01})); // window namespace + + private static final ByteString OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x10}) // non global namespace + .concat(ByteString.copyFrom(new byte[] {0x02})) // non interval window + .concat(encode(new CustomWindow.CustomWindowCoder(), new CustomWindow(INTERVAL_WINDOW))) + .concat(ByteString.copyFrom(new byte[] {0x02})) // window and trigger namespace + .concat( + ByteString.copyFrom(new byte[] {0x00, 0x00, 0x00, 0x05})); // big endian trigger index + + private static final String TIMER_FAMILY_ID = "timerFamily"; + private static final ByteString TIMER_FAMILY_ID_BYTES = + encode(StringUtf8Coder.of(), TIMER_FAMILY_ID); + + private static final String TIMER_ID = "timerId"; + private static final ByteString TIMER_ID_BYTES = encode(StringUtf8Coder.of(), TIMER_ID); + + private static final ByteString SYSTEM_TIMER_BYTES = + ByteString.copyFrom(new byte[] {0x3}) // system timer + .concat(TIMER_FAMILY_ID_BYTES) + .concat(TIMER_ID_BYTES); + + private static final ByteString USER_TIMER_BYTES = + ByteString.copyFrom(new byte[] {0x4}) // user timer + .concat(TIMER_FAMILY_ID_BYTES) + .concat(TIMER_ID_BYTES); + + private static final ByteString SYSTEM_TIMER_BYTES_NO_FAMILY_ID = + ByteString.copyFrom(new byte[] {0x3}) // system timer + .concat(encode(StringUtf8Coder.of(), "")) + .concat(TIMER_ID_BYTES); + + private static final ByteString USER_TIMER_BYTES_NO_FAMILY_ID = + ByteString.copyFrom(new byte[] {0x4}) // user timer + .concat(encode(StringUtf8Coder.of(), "")) + .concat(TIMER_ID_BYTES); + + @RunWith(Parameterized.class) + public static class EncodeStateTagTest { + + @Parameters(name = "{index}: namespace={0} stateTag={1} expectedBytes={2}") + public static Collection data() { + return ImmutableList.of( + new Object[] { + GLOBAL_NAMESPACE, USER_STATE_TAG, GLOBAL_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }, + new Object[] { + GLOBAL_NAMESPACE, + SYSTEM_STATE_TAG, + GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_STATE_TAG_BYTES) + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + USER_STATE_TAG, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + USER_STATE_TAG, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + USER_STATE_TAG, + OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + USER_STATE_TAG, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }); + } + + @Parameter(0) + public StateNamespace namespace; + + @Parameter(1) + public StateTag stateTag; + + @Parameter(2) + public ByteString expectedBytes; + + @Test + public void testStateTag() { + assertEquals( + expectedBytes, + WindmillTagEncodingV2.instance().stateTag(namespace, stateTag).byteString()); + } + } + + @RunWith(Parameterized.class) + public static class TimerTagTest { + + @Parameters( + name = + "{index}: namespace={0} prefix={1} expectedBytes={2} includeTimerId={3}" + + " includeTimerFamilyId={4} timeDomain={4}") + public static Collection data() { + List data = new ArrayList<>(); + for (boolean includeTimerFamilyId : ImmutableList.of(true, false)) { + ByteString expectedSystemTimerBytes = + includeTimerFamilyId ? SYSTEM_TIMER_BYTES : SYSTEM_TIMER_BYTES_NO_FAMILY_ID; + ByteString expectedUserTimerBytes = + includeTimerFamilyId ? USER_TIMER_BYTES : USER_TIMER_BYTES_NO_FAMILY_ID; + List tests = + ImmutableList.of( + new Object[] { + GLOBAL_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + GLOBAL_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + GLOBAL_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + GLOBAL_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }); + + for (Object[] params : tests) { + for (TimeDomain timeDomain : TimeDomain.values()) { + data.add( + new Object[] {params[0], params[1], params[2], includeTimerFamilyId, timeDomain}); + } + } + } + return data; + } + + @Parameter(0) + public StateNamespace namespace; + + @Parameter(1) + public WindmillNamespacePrefix prefix; + + @Parameter(2) + public ByteString expectedBytes; + + @Parameter(3) + public boolean includeTimerFamilyId; + + @Parameter(4) + public TimeDomain timeDomain; + + @Test + public void testTimerTag() { + TimerData timerData = + includeTimerFamilyId + ? TimerData.of( + TIMER_ID, + TIMER_FAMILY_ID, + namespace, + new Instant(123), + new Instant(456), + timeDomain) + : TimerData.of(TIMER_ID, namespace, new Instant(123), new Instant(456), timeDomain); + assertEquals(expectedBytes, WindmillTagEncodingV2.instance().timerTag(prefix, timerData)); + } + } + + @RunWith(Parameterized.class) + public static class TimerDataFromTimerTest { + + @Parameters(name = "{index}: namespace={0} prefix={1} draining={4} timeDomain={5}") + public static Collection data() { + List tests = + ImmutableList.of( + new Object[] { + GLOBAL_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + GLOBAL_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + GlobalWindow.Coder.INSTANCE + }, + new Object[] { + GLOBAL_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + GlobalWindow.Coder.INSTANCE + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + IntervalWindow.getCoder() + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + IntervalWindow.getCoder() + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + new CustomWindow.CustomWindowCoder() + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + OTHER_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + new CustomWindow.CustomWindowCoder() + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + IntervalWindow.getCoder() + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + IntervalWindow.getCoder() + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + new CustomWindow.CustomWindowCoder() + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + new CustomWindow.CustomWindowCoder() + }); + + List data = new ArrayList<>(); + for (Object[] params : tests) { + for (boolean draining : ImmutableList.of(true, false)) { + for (TimeDomain timeDomain : TimeDomain.values()) { + data.add( + new Object[] {params[0], params[1], params[2], params[3], draining, timeDomain}); + } + } + } + return data; + } + + @Parameter(0) + public StateNamespace namespace; + + @Parameter(1) + public WindmillNamespacePrefix prefix; + + @Parameter(2) + public ByteString timerTag; + + @Parameter(3) + public Coder windowCoder; + + @Parameter(4) + public boolean draining; + + @Parameter(5) + public TimeDomain timeDomain; + + @Test + public void testTimerDataFromTimer() { + WindmillTagEncodingV2 encoding = WindmillTagEncodingV2.instance(); + Instant timestamp = Instant.now(); + Instant outputTimestamp = timestamp.plus(Duration.standardSeconds(1)); + TimerData timerData = + TimerData.of( + TIMER_ID, TIMER_FAMILY_ID, namespace, timestamp, outputTimestamp, timeDomain); + Timer timer = + Timer.newBuilder() + .setTag(timerTag) + .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timestamp)) + .setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)) + .setType(timerType(timeDomain)) + .build(); + assertEquals( + timerData, encoding.windmillTimerToTimerData(prefix, timer, windowCoder, draining)); + } + } + + @RunWith(JUnit4.class) + public static class TimerHoldTagTest { + + @Test + public void testTimerHoldTagUsesTimerTag() { + TimerData timerData = + TimerData.of( + TIMER_ID, + TIMER_FAMILY_ID, + GLOBAL_NAMESPACE, + new Instant(123), + new Instant(456), + TimeDomain.EVENT_TIME); + byte[] bytes = new byte[16]; + ThreadLocalRandom.current().nextBytes(bytes); + ByteString timerTag = ByteString.copyFrom(bytes); + assertEquals( + WindmillTagEncodingV2.instance() + .timerHoldTag(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timerData, timerTag), + timerTag); + } + } + + @RunWith(JUnit4.class) + public static class SortOrderTest { + + @Test + public void testSortOrder() { + WindmillTagEncodingV2 encoding = WindmillTagEncodingV2.instance(); + + Instant baseInstant = Instant.now(); + // [5, 20) + StateNamespace interval5_20 = + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + baseInstant.plus(Duration.millis(5)), baseInstant.plus(Duration.millis(20)))); + // [10, 20) + StateNamespace interval10_20 = + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + baseInstant.plus(Duration.millis(10)), baseInstant.plus(Duration.millis(20)))); + // [20, 30) + StateNamespace interval20_30 = + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + baseInstant.plus(Duration.millis(20)), baseInstant.plus(Duration.millis(30)))); + + ByteString globalBytes = encoding.stateTag(GLOBAL_NAMESPACE, USER_STATE_TAG).byteString(); + ByteString otherWindowBytes = + encoding.stateTag(OTHER_WINDOW_NAMESPACE, USER_STATE_TAG).byteString(); + ByteString interval5_20Bytes = encoding.stateTag(interval5_20, USER_STATE_TAG).byteString(); + ByteString interval10_20Bytes = encoding.stateTag(interval10_20, USER_STATE_TAG).byteString(); + ByteString interval20_30Bytes = encoding.stateTag(interval20_30, USER_STATE_TAG).byteString(); + + // Global < Non-Interval < Interval + assertOrdered(globalBytes, otherWindowBytes); + assertOrdered(otherWindowBytes, interval5_20Bytes); + + // Interval sorting: EndTime then StartTime + // [5, 20) < [10, 20) (Same End=20, Start 5 < 10) + assertOrdered(interval5_20Bytes, interval10_20Bytes); + // [10, 20) < [20, 30) (End 20 < 30) + assertOrdered(interval10_20Bytes, interval20_30Bytes); + + assertTrue(globalBytes.startsWith(ByteString.copyFrom(new byte[] {0x01}))); + assertTrue(otherWindowBytes.startsWith(ByteString.copyFrom(new byte[] {0x10, 0x02}))); + assertTrue(interval5_20Bytes.startsWith(ByteString.copyFrom(new byte[] {0x10, 0x64}))); + assertTrue(interval10_20Bytes.startsWith(ByteString.copyFrom(new byte[] {0x10, 0x64}))); + assertTrue(interval20_30Bytes.startsWith(ByteString.copyFrom(new byte[] {0x10, 0x64}))); + } + + private void assertOrdered(ByteString smaller, ByteString larger) { + assertTrue(ByteString.unsignedLexicographicalComparator().compare(smaller, larger) < 0); + } + } + + private static class CustomWindow extends IntervalWindow { + + private CustomWindow(IntervalWindow intervalWindow) { + super(intervalWindow.start(), intervalWindow.end()); + } + + private static class CustomWindowCoder extends Coder { + + @Override + public void verifyDeterministic() throws NonDeterministicException { + IntervalWindowCoder.of().verifyDeterministic(); + } + + @Override + public List> getCoderArguments() { + return IntervalWindowCoder.of().getCoderArguments(); + } + + @Override + public void encode(CustomWindow value, OutputStream outStream) throws IOException { + IntervalWindowCoder.of().encode(value, outStream); + } + + @Override + public CustomWindow decode(InputStream inStream) throws IOException { + return new CustomWindow(IntervalWindowCoder.of().decode(inStream)); + } + } + } + + private static ByteString encode(Coder coder, T value) { + try { + ByteString.Output out = ByteString.newOutput(); + coder.encode(value, out); + return out.toByteString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static Timer.Type timerType(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return Timer.Type.WATERMARK; + case PROCESSING_TIME: + return Timer.Type.REALTIME; + case SYNCHRONIZED_PROCESSING_TIME: + return Timer.Type.DEPENDENT_REALTIME; + default: + throw new IllegalArgumentException("Unrecognized TimeDomain: " + domain); + } + } +}