From 309d938e4e6527ea3dba2d077b234abb8409ccd3 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Thu, 18 Dec 2025 06:48:35 +0000 Subject: [PATCH 1/8] Add WindmillTagEncoding Interface --- .../worker/StreamingModeExecutionContext.java | 37 ++--- .../worker/WindmillKeyedWorkItem.java | 16 ++- .../worker/WindmillTimerInternals.java | 18 +-- .../worker/WindowingWindmillReader.java | 8 +- .../windmill/state/CachingStateTable.java | 29 ++-- .../state/WindmillCombiningState.java | 4 +- .../state/WindmillStateInternals.java | 4 +- .../windmill/state/WindmillTagEncoding.java | 131 ++++++++++++++++++ ...agUtil.java => WindmillTagEncodingV1.java} | 101 ++------------ .../StreamingGroupAlsoByWindowFnsTest.java | 12 +- ...ngGroupAlsoByWindowsReshuffleDoFnTest.java | 9 +- .../worker/WindmillKeyedWorkItemTest.java | 42 ++++-- .../state/WindmillStateCacheTest.java | 16 ++- .../state/WindmillStateInternalsTest.java | 8 +- ...st.java => WindmillTagEncodingV1Test.java} | 22 +-- 15 files changed, 279 insertions(+), 178 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java rename runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/{WindmillStateTagUtil.java => WindmillTagEncodingV1.java} (73%) rename runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/{WindmillStateTagUtilTest.java => WindmillTagEncodingV1Test.java} (93%) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index c8ff7840bd1d..d734d629711a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -61,7 +61,8 @@ import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; @@ -119,6 +120,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext, Map>> sideInputCache; + private final WindmillTagEncoding windmillTagEncoding; /** * The current user-facing key for this execution context. * @@ -169,6 +171,7 @@ public StreamingModeExecutionContext( this.readerCache = readerCache; this.globalConfigHandle = globalConfigHandle; this.sideInputCache = new HashMap<>(); + this.windmillTagEncoding = WindmillTagEncodingV1.instance(); this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; @@ -200,6 +203,10 @@ public boolean getDrainMode() { return work != null ? work.getDrainMode() : false; } + public WindmillTagEncoding getWindmillTagEncoding() { + return windmillTagEncoding; + } + public boolean offsetBasedDeduplicationSupported() { return activeReader != null && activeReader.getCurrentSource().offsetBasedDeduplicationSupported(); @@ -777,7 +784,7 @@ public void start( stateReader, getWorkItem().getIsNewKey(), cacheForKey.forFamily(stateFamily), - WindmillStateTagUtil.instance(), + windmillTagEncoding, scopedReadStateSupplier); this.systemTimerInternals = @@ -786,7 +793,7 @@ public void start( WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, processingTime, watermarks, - WindmillStateTagUtil.instance(), + windmillTagEncoding, td -> {}); this.userTimerInternals = @@ -795,7 +802,7 @@ public void start( WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, processingTime, watermarks, - WindmillStateTagUtil.instance(), + windmillTagEncoding, this::onUserTimerModified); this.cachedFiredSystemTimers = null; @@ -823,12 +830,11 @@ public TimerData getNextFiredTimer(Coder windowCode && timer.getStateFamily().equals(stateFamily)) .transform( timer -> - WindmillStateTagUtil.instance() - .windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + windmillTagEncoding.windmillTimerToTimerData( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + timer, + windowCoder, + getDrainMode())) .iterator(); } @@ -887,12 +893,11 @@ public TimerData getNextFiredUserTimer(Coder window && timer.getStateFamily().equals(stateFamily)) .transform( timer -> - WindmillStateTagUtil.instance() - .windmillTimerToTimerData( - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + windmillTagEncoding.windmillTimerToTimerData( + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + timer, + windowCoder, + getDrainMode())) .iterator()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index ad5f2b0dd40a..1f99d929898c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -30,7 +30,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -68,6 +68,7 @@ public class WindmillKeyedWorkItem implements KeyedWorkItem private final transient Coder windowCoder; private final transient Coder> windowsCoder; private final transient Coder valueCoder; + private final WindmillTagEncoding windmillTagEncoding; public WindmillKeyedWorkItem( K key, @@ -75,12 +76,14 @@ public WindmillKeyedWorkItem( Coder windowCoder, Coder> windowsCoder, Coder valueCoder, + WindmillTagEncoding windmillTagEncoding, boolean drainMode) { this.key = key; this.workItem = workItem; this.windowCoder = windowCoder; this.windowsCoder = windowsCoder; this.valueCoder = valueCoder; + this.windmillTagEncoding = windmillTagEncoding; this.drainMode = drainMode; } @@ -98,12 +101,11 @@ public Iterable timersIterable() { .append(nonEventTimers) .transform( timer -> - WindmillStateTagUtil.instance() - .windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - drainMode)); + windmillTagEncoding.windmillTimerToTimerData( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + timer, + windowCoder, + drainMode)); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index e1d89fc10a17..cb41aa1ccab4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -29,7 +29,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -61,26 +61,26 @@ class WindmillTimerInternals implements TimerInternals { private final String stateFamily; private final WindmillNamespacePrefix prefix; private final Consumer onTimerModified; - private final WindmillStateTagUtil windmillStateTagUtil; + private final WindmillTagEncoding windmillTagEncoding; public WindmillTimerInternals( String stateFamily, // unique identifies a step WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s" Instant processingTime, Watermarks watermarks, - WindmillStateTagUtil windmillStateTagUtil, + WindmillTagEncoding windmillTagEncoding, Consumer onTimerModified) { this.watermarks = watermarks; this.processingTime = checkNotNull(processingTime); this.stateFamily = stateFamily; this.prefix = prefix; - this.windmillStateTagUtil = windmillStateTagUtil; + this.windmillTagEncoding = windmillTagEncoding; this.onTimerModified = onTimerModified; } public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) { return new WindmillTimerInternals( - stateFamily, prefix, processingTime, watermarks, windmillStateTagUtil, onTimerModified); + stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, onTimerModified); } @Override @@ -187,7 +187,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { TimerData timerData = value.getKey(); Timer.Builder timer = - windmillStateTagUtil.buildWindmillTimerFromTimerData( + windmillTagEncoding.buildWindmillTimerFromTimerData( stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder()); if (value.getValue()) { @@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Setting a timer, clear any prior hold and set to the new value outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) .setStateFamily(stateFamily) .setReset(true) .addTimestamps( @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Clear the hold in case a previous iteration of this timer set one. outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) .setStateFamily(stateFamily) .setReset(true); } @@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // We are deleting timer; clear the hold outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) .setStateFamily(stateFamily) .setReset(true); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java index f4a6eec61cbf..7dd55d91211d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java @@ -120,7 +120,13 @@ public NativeReaderIterator>> iterator() throw final WorkItem workItem = context.getWorkItem(); KeyedWorkItem keyedWorkItem = new WindmillKeyedWorkItem<>( - key, workItem, windowCoder, windowsCoder, valueCoder, context.getDrainMode()); + key, + workItem, + windowCoder, + windowsCoder, + valueCoder, + context.getWindmillTagEncoding(), + context.getDrainMode()); final boolean isEmptyWorkItem = (Iterables.isEmpty(keyedWorkItem.timersIterable()) && Iterables.isEmpty(keyedWorkItem.elementsIterable())); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java index 3ea1fa876263..5144089f9ef6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java @@ -48,7 +48,7 @@ final class CachingStateTable { private final @Nullable CachingStateTable derivedStateTable; private final boolean isNewKey; private final boolean mapStateViaMultimapState; - private final WindmillStateTagUtil windmillStateTagUtil; + private final WindmillTagEncoding windmillTagEncoding; private CachingStateTable(Builder builder) { this.stateTable = new HashMap<>(); @@ -60,7 +60,7 @@ private CachingStateTable(Builder builder) { this.scopedReadStateSupplier = builder.scopedReadStateSupplier; this.derivedStateTable = builder.derivedStateTable; this.mapStateViaMultimapState = builder.mapStateViaMultimapState; - this.windmillStateTagUtil = builder.windmillStateTagUtil; + this.windmillTagEncoding = builder.windmillTagEncoding; if (this.isSystemTable) { Preconditions.checkState(derivedStateTable == null); } else { @@ -74,9 +74,9 @@ static Builder builder( ForKeyAndFamily cache, boolean isNewKey, Supplier scopedReadStateSupplier, - WindmillStateTagUtil windmillStateTagUtil) { + WindmillTagEncoding windmillTagEncoding) { return new Builder( - stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, windmillStateTagUtil); + stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, windmillTagEncoding); } /** @@ -114,7 +114,7 @@ private StateTag.StateBinder binderForNamespace(StateNamespace namespace, StateC public BagState bindBag(StateTag> address, Coder elemCoder) { StateTag> resolvedAddress = isSystemTable ? StateTags.makeSystemTagInternal(address) : address; - InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, resolvedAddress); + InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, resolvedAddress); @Nullable WindmillBag bag = (WindmillBag) cache.get(namespace, encodedKey); if (bag == null) { @@ -144,7 +144,7 @@ public AbstractWindmillMap bindMap( new WindmillMapViaMultimap<>( bindMultimap(internalMultimapAddress, keyCoder, valueCoder)); } else { - InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec); + InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, spec); result = (AbstractWindmillMap) cache.get(namespace, encodedKey); if (result == null) { result = @@ -161,7 +161,7 @@ public WindmillMultimap bindMultimap( StateTag> spec, Coder keyCoder, Coder valueCoder) { - InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec); + InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, spec); WindmillMultimap result = (WindmillMultimap) cache.get(namespace, encodedKey); if (result == null) { @@ -177,8 +177,7 @@ public WindmillMultimap bindMultimap( public OrderedListState bindOrderedList( StateTag> spec, Coder elemCoder) { StateTag> specOrInternalTag = addressOrInternalTag(spec); - InternedByteString encodedKey = - windmillStateTagUtil.encodeKey(namespace, specOrInternalTag); + InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, specOrInternalTag); WindmillOrderedList result = (WindmillOrderedList) cache.get(namespace, encodedKey); if (result == null) { @@ -202,7 +201,7 @@ public WatermarkHoldState bindWatermark( StateTag address, TimestampCombiner timestampCombiner) { StateTag addressOrInternalTag = addressOrInternalTag(address); InternedByteString encodedKey = - windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag); + windmillTagEncoding.stateTag(namespace, addressOrInternalTag); WindmillWatermarkHold result = (WindmillWatermarkHold) cache.get(namespace, encodedKey); if (result == null) { @@ -231,7 +230,7 @@ public CombiningState bindCom combineFn, cache, isNewKey, - windmillStateTagUtil); + windmillTagEncoding); result.initializeForWorkItem(reader, scopedReadStateSupplier); return result; @@ -251,7 +250,7 @@ CombiningState bindCombiningValueWithContext( public ValueState bindValue(StateTag> address, Coder coder) { StateTag> addressOrInternalTag = addressOrInternalTag(address); InternedByteString encodedKey = - windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag); + windmillTagEncoding.stateTag(namespace, addressOrInternalTag); WindmillValue result = (WindmillValue) cache.get(namespace, encodedKey); if (result == null) { @@ -289,7 +288,7 @@ static class Builder { private final WindmillStateCache.ForKeyAndFamily cache; private final Supplier scopedReadStateSupplier; private final boolean isNewKey; - private final WindmillStateTagUtil windmillStateTagUtil; + private final WindmillTagEncoding windmillTagEncoding; private boolean isSystemTable; private @Nullable CachingStateTable derivedStateTable; private boolean mapStateViaMultimapState = false; @@ -300,7 +299,7 @@ private Builder( ForKeyAndFamily cache, Supplier scopedReadStateSupplier, boolean isNewKey, - WindmillStateTagUtil windmillStateTagUtil) { + WindmillTagEncoding windmillTagEncoding) { this.stateFamily = stateFamily; this.reader = reader; this.cache = cache; @@ -308,7 +307,7 @@ private Builder( this.isNewKey = isNewKey; this.isSystemTable = true; this.derivedStateTable = null; - this.windmillStateTagUtil = windmillStateTagUtil; + this.windmillTagEncoding = windmillTagEncoding; } Builder withDerivedState(CachingStateTable derivedStateTable) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java index 9ed31f250389..3da3ed7fad1d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java @@ -60,9 +60,9 @@ class WindmillCombiningState extends WindmillState CombineFn combineFn, ForKeyAndFamily cache, boolean isNewKey, - WindmillStateTagUtil windmillStateTagUtil) { + WindmillTagEncoding windmillTagEncoding) { StateTag> internalBagAddress = StateTags.convertToBagTagInternal(address); - InternedByteString encodeKey = windmillStateTagUtil.encodeKey(namespace, internalBagAddress); + InternedByteString encodeKey = windmillTagEncoding.stateTag(namespace, internalBagAddress); WindmillBag bag = (WindmillBag) cache.get(namespace, encodeKey); if (bag == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java index ecf64c1fc84f..db036bee43c3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java @@ -61,14 +61,14 @@ public WindmillStateInternals( WindmillStateReader reader, boolean isNewKey, WindmillStateCache.ForKeyAndFamily cache, - WindmillStateTagUtil windmillStateTagUtil, + WindmillTagEncoding windmillTagEncoding, Supplier scopedReadStateSupplier) { this.key = key; this.cache = cache; this.scopedReadStateSupplier = scopedReadStateSupplier; CachingStateTable.Builder builder = CachingStateTable.builder( - stateFamily, reader, cache, isNewKey, scopedReadStateSupplier, windmillStateTagUtil); + stateFamily, reader, cache, isNewKey, scopedReadStateSupplier, windmillTagEncoding); if (cache.supportMapStateViaMultimapState()) { builder = builder.withMapStateViaMultimapState(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java new file mode 100644 index 000000000000..59841f67347d --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.state; + +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; + +@Internal +@ThreadSafe +/* + * Windmill StateTag, TimerTag encoding interface + */ +public abstract class WindmillTagEncoding { + + protected static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE = + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)); + + protected static final Instant OUTPUT_TIMESTAMP_MAX_VALUE = + BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); + + /** Encodes state tag */ + public abstract InternedByteString stateTag(StateNamespace namespace, StateTag address); + + /** + * Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark + * hold that is only freed after the timer fires. + */ + public abstract ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData); + + /** + * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and + * timestamp. + * + *

This is necessary because Windmill will deduplicate based only on this tag. + */ + public abstract ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData); + + /** Converts Windmill Timer to beam TimerData */ + public abstract TimerData windmillTimerToTimerData( + WindmillNamespacePrefix prefix, + Timer timer, + Coder windowCoder, + boolean draining); + + /** + * Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}. + * + * @return the input builder for chaining + */ + public Timer.Builder buildWindmillTimerFromTimerData( + @Nullable String stateFamily, + WindmillNamespacePrefix prefix, + TimerData timerData, + Timer.Builder builder) { + + builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain())); + + if (stateFamily != null) { + builder.setStateFamily(stateFamily); + } + + builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp())); + + // Store the output timestamp in the metadata timestamp. + Instant outputTimestamp = timerData.getOutputTimestamp(); + if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of + // the global window + // here instead. + outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE; + } + builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)); + return builder; + } + + protected static Timer.Type timerType(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return Timer.Type.WATERMARK; + case PROCESSING_TIME: + return Timer.Type.REALTIME; + case SYNCHRONIZED_PROCESSING_TIME: + return Timer.Type.DEPENDENT_REALTIME; + default: + throw new IllegalArgumentException("Unrecgonized TimeDomain: " + domain); + } + } + + protected static TimeDomain timerTypeToTimeDomain(Timer.Type type) { + switch (type) { + case REALTIME: + return TimeDomain.PROCESSING_TIME; + case DEPENDENT_REALTIME: + return TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + case WATERMARK: + return TimeDomain.EVENT_TIME; + default: + throw new IllegalArgumentException("Unsupported timer type " + type); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java similarity index 73% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java index 6f6ea02938ed..19e31351a52b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java @@ -30,42 +30,28 @@ import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.util.VarInt; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; import org.joda.time.Instant; @Internal @ThreadSafe -public class WindmillStateTagUtil { - private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE = - GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)); - - private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE = - BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); +public class WindmillTagEncodingV1 extends WindmillTagEncoding { private static final String TIMER_HOLD_PREFIX = "/h"; - private static final WindmillStateTagUtil INSTANCE = new WindmillStateTagUtil(); + private static final WindmillTagEncodingV1 INSTANCE = new WindmillTagEncodingV1(); // Private constructor to prevent instantiations from outside. - private WindmillStateTagUtil() {} + private WindmillTagEncodingV1() {} - /** - * Encodes the given namespace and address as {@code <namespace>+<address>}. The - * returned InternedByteStrings are weakly interned to reduce memory usage and reduce GC pressure. - */ - @VisibleForTesting - InternedByteString encodeKey(StateNamespace namespace, StateTag address) { + /** {@inheritDoc} */ + @Override + public InternedByteString stateTag(StateNamespace namespace, StateTag address) { try (StreamHandle streamHandle = ThreadLocalByteStringOutputStream.acquire()) { // Use ByteStringOutputStream rather than concatenation and String.format. We build these keys // a lot, and this leads to better performance results. See associated benchmarks. @@ -82,10 +68,8 @@ InternedByteString encodeKey(StateNamespace namespace, StateTag address) { } } - /** - * Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark - * hold that is only freed after the timer fires. - */ + /** {@inheritDoc} */ + @Override public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData) { String tagString; if ("".equals(timerData.getTimerFamilyId())) { @@ -118,12 +102,8 @@ public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerDa return ByteString.copyFromUtf8(tagString); } - /** - * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and - * timestamp. - * - *

This is necessary because Windmill will deduplicate based only on this tag. - */ + /** {@inheritDoc} */ + @Override public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { String tagString; if (useNewTimerTagEncoding(timerData)) { @@ -151,6 +131,8 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) return ByteString.copyFromUtf8(tagString); } + /** {@inheritDoc} */ + @Override public TimerData windmillTimerToTimerData( WindmillNamespacePrefix prefix, Timer timer, @@ -253,69 +235,12 @@ public TimerData windmillTimerToTimerData( } - /** - * Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}. - * - * @return the input builder for chaining - */ - public Timer.Builder buildWindmillTimerFromTimerData( - @Nullable String stateFamily, - WindmillNamespacePrefix prefix, - TimerData timerData, - Timer.Builder builder) { - - builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain())); - - if (stateFamily != null) { - builder.setStateFamily(stateFamily); - } - - builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp())); - - // Store the output timestamp in the metadata timestamp. - Instant outputTimestamp = timerData.getOutputTimestamp(); - if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of - // the global window - // here instead. - outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE; - } - builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)); - return builder; - } - - private static Timer.Type timerType(TimeDomain domain) { - switch (domain) { - case EVENT_TIME: - return Timer.Type.WATERMARK; - case PROCESSING_TIME: - return Timer.Type.REALTIME; - case SYNCHRONIZED_PROCESSING_TIME: - return Timer.Type.DEPENDENT_REALTIME; - default: - throw new IllegalArgumentException("Unrecgonized TimeDomain: " + domain); - } - } - - private static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) { - switch (type) { - case REALTIME: - return TimeDomain.PROCESSING_TIME; - case DEPENDENT_REALTIME: - return TimeDomain.SYNCHRONIZED_PROCESSING_TIME; - case WATERMARK: - return TimeDomain.EVENT_TIME; - default: - throw new IllegalArgumentException("Unsupported timer type " + type); - } - } - private static boolean useNewTimerTagEncoding(TimerData timerData) { return !timerData.getTimerFamilyId().isEmpty(); } /** @return the singleton WindmillStateTagUtil */ - public static WindmillStateTagUtil instance() { + public static WindmillTagEncodingV1 instance() { return INSTANCE; } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 1ae9678eed60..094623b81311 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -50,7 +50,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.InputMessageBundle; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -149,7 +149,7 @@ private void addTimer( .getTimersBuilder() .addTimersBuilder() .setTag( - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .timerTag( WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, TimerData.of( @@ -196,7 +196,13 @@ private WindowedValue> createValue( return new ValueInEmptyWindows<>( (KeyedWorkItem) new WindmillKeyedWorkItem<>( - KEY, workItem.build(), windowCoder, wildcardWindowsCoder, valueCoder, false)); + KEY, + workItem.build(), + windowCoder, + wildcardWindowsCoder, + valueCoder, + WindmillTagEncodingV1.instance(), + false)); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java index 52c9844add86..bdeefcebb2ac 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.InputMessageBundle; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CollectionCoder; @@ -132,7 +133,13 @@ private WindowedValue> createValue( return new ValueInEmptyWindows<>( (KeyedWorkItem) new WindmillKeyedWorkItem<>( - KEY, workItem.build(), windowCoder, wildcardWindowsCoder, valueCoder, false)); + KEY, + workItem.build(), + windowCoder, + wildcardWindowsCoder, + valueCoder, + WindmillTagEncodingV1.instance(), + false)); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index a722b454e38e..2227c25ef15d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -30,7 +30,8 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.FakeKeyedWorkItemCoder; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CollectionCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -78,9 +79,12 @@ public class WindmillKeyedWorkItemTest { private static final StateNamespace STATE_NAMESPACE_2 = StateNamespaces.window(WINDOW_CODER, WINDOW_2); + public WindmillTagEncoding windmillTagEncoding; + @Before public void setUp() { MockitoAnnotations.initMocks(this); + windmillTagEncoding = WindmillTagEncodingV1.instance(); } @Test @@ -97,7 +101,13 @@ public void testElementIteration() throws Exception { KeyedWorkItem keyedWorkItem = new WindmillKeyedWorkItem<>( - KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, false); + KEY, + workItem.build(), + WINDOW_CODER, + WINDOWS_CODER, + VALUE_CODER, + windmillTagEncoding, + false); assertThat( keyedWorkItem.elementsIterable(), @@ -170,7 +180,8 @@ public void testTimerOrdering() throws Exception { .build(); KeyedWorkItem keyedWorkItem = - new WindmillKeyedWorkItem<>(KEY, workItem, WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, false); + new WindmillKeyedWorkItem<>( + KEY, workItem, WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, windmillTagEncoding, false); assertThat( keyedWorkItem.timersIterable(), @@ -181,18 +192,17 @@ public void testTimerOrdering() throws Exception { makeTimer(STATE_NAMESPACE_1, 2, TimeDomain.PROCESSING_TIME))); } - private static Windmill.Timer makeSerializedTimer( + private Windmill.Timer makeSerializedTimer( StateNamespace ns, long timestamp, Windmill.Timer.Type type) { return Windmill.Timer.newBuilder() .setTag( - WindmillStateTagUtil.instance() - .timerTag( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - TimerData.of( - ns, - new Instant(timestamp), - new Instant(timestamp), - timerTypeToTimeDomain(type)))) + windmillTagEncoding.timerTag( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + TimerData.of( + ns, + new Instant(timestamp), + new Instant(timestamp), + timerTypeToTimeDomain(type)))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp))) .setType(type) .setStateFamily(STATE_FAMILY) @@ -244,7 +254,13 @@ public void testDrainPropagated() throws Exception { .build()); KeyedWorkItem keyedWorkItem = new WindmillKeyedWorkItem<>( - KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, true); + KEY, + workItem.build(), + WINDOW_CODER, + WINDOWS_CODER, + VALUE_CODER, + windmillTagEncoding, + true); Iterator> iterator = keyedWorkItem.elementsIterable().iterator(); Assert.assertTrue(iterator.next().causedByDrain()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index 40b292298959..bbb8e4c93c07 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -60,6 +60,8 @@ public class WindmillStateCacheTest { private static final long MEGABYTES = 1024 * 1024; DataflowWorkerHarnessOptions options; + WindmillTagEncoding windmillTagEncoding; + private static class TestStateTag implements StateTag { final String id; @@ -150,21 +152,20 @@ private static WindmillComputationKey computationKey( return WindmillComputationKey.create(computationId, ByteString.copyFromUtf8(key), shardingKey); } - private static Optional getFromCache( + private Optional getFromCache( WindmillStateCache.ForKeyAndFamily keyCache, StateNamespace namespace, StateTag address) { return (Optional) Optional.ofNullable( - keyCache.get(namespace, WindmillStateTagUtil.instance().encodeKey(namespace, address))); + keyCache.get(namespace, windmillTagEncoding.stateTag(namespace, address))); } - private static void putInCache( + private void putInCache( WindmillStateCache.ForKeyAndFamily keyCache, StateNamespace namespace, StateTag tag, T value, long weight) { - keyCache.put( - namespace, WindmillStateTagUtil.instance().encodeKey(namespace, tag), value, weight); + keyCache.put(namespace, windmillTagEncoding.stateTag(namespace, tag), value, weight); } WindmillStateCache cache; @@ -172,6 +173,7 @@ private static void putInCache( @Before public void setUp() { options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); + windmillTagEncoding = WindmillTagEncodingV1.instance(); cache = WindmillStateCache.builder().setSizeMb(400).build(); assertEquals(0, cache.getWeight()); } @@ -188,14 +190,14 @@ public void conflictingUserAndSystemTags() { WindmillValue userValue = new WindmillValue<>( StateNamespaces.global(), - WindmillStateTagUtil.instance().encodeKey(StateNamespaces.global(), userTag), + windmillTagEncoding.stateTag(StateNamespaces.global(), userTag), STATE_FAMILY, StringUtf8Coder.of(), false); WindmillValue systemValue = new WindmillValue<>( StateNamespaces.global(), - WindmillStateTagUtil.instance().encodeKey(StateNamespaces.global(), systemTag), + windmillTagEncoding.stateTag(StateNamespaces.global(), systemTag), STATE_FAMILY, StringUtf8Coder.of(), false); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index 1a31e7b8d685..7a06d3a29493 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -128,6 +128,7 @@ public class WindmillStateInternalsTest { private WindmillStateInternals underTest; private WindmillStateInternals underTestNewKey; private WindmillStateInternals underTestMapViaMultimap; + private WindmillTagEncoding windmillTagEncoding; private WindmillStateCache cache; private WindmillStateCache cacheViaMultimap; @Mock private Supplier readStateSupplier; @@ -216,6 +217,7 @@ public void setUp() { public void resetUnderTest() { workToken++; + windmillTagEncoding = WindmillTagEncodingV1.instance(); underTest = new WindmillStateInternals<>( "dummyKey", @@ -230,7 +232,7 @@ public void resetUnderTest() { 17L, workToken) .forFamily(STATE_FAMILY), - WindmillStateTagUtil.instance(), + windmillTagEncoding, readStateSupplier); underTestNewKey = new WindmillStateInternals( @@ -246,7 +248,7 @@ public void resetUnderTest() { 17L, workToken) .forFamily(STATE_FAMILY), - WindmillStateTagUtil.instance(), + windmillTagEncoding, readStateSupplier); underTestMapViaMultimap = new WindmillStateInternals( @@ -262,7 +264,7 @@ public void resetUnderTest() { 17L, workToken) .forFamily(STATE_FAMILY), - WindmillStateTagUtil.instance(), + windmillTagEncoding, readStateSupplier); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java similarity index 93% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index eb4713695dc6..73acdf937811 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -50,7 +50,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class WindmillStateTagUtilTest { +public class WindmillTagEncodingV1Test { private static final List, StateNamespace>> TEST_NAMESPACES_WITH_CODERS = ImmutableList.of( @@ -80,15 +80,15 @@ public class WindmillStateTagUtilTest { ImmutableList.of("", "foo", "this one has spaces", "this/one/has/slashes", "/"); @Test - public void testEncodeKey() { + public void testStateTag() { StateNamespaceForTest namespace = new StateNamespaceForTest("key"); StateTag> foo = StateTags.set("foo", VarIntCoder.of()); - InternedByteString bytes = WindmillStateTagUtil.instance().encodeKey(namespace, foo); + InternedByteString bytes = WindmillTagEncodingV1.instance().stateTag(namespace, foo); assertEquals("key+ufoo", bytes.byteString().toStringUtf8()); } @Test - public void testEncodeKeyNested() { + public void testStateTagNested() { // Hypothetical case where a namespace/tag encoding depends on a call to encodeKey // This tests if thread locals in WindmillStateUtil are not reused with nesting StateNamespaceForTest namespace1 = new StateNamespaceForTest("key"); @@ -97,7 +97,7 @@ public void testEncodeKeyNested() { new StateTag>() { @Override public void appendTo(Appendable sb) throws IOException { - WindmillStateTagUtil.instance().encodeKey(namespace1, tag1); + WindmillTagEncodingV1.instance().stateTag(namespace1, tag1); sb.append("tag2"); } @@ -121,11 +121,11 @@ public SetState bind(StateBinder binder) { new StateNamespaceForTest("key") { @Override public void appendTo(Appendable sb) throws IOException { - WindmillStateTagUtil.instance().encodeKey(namespace1, tag1); + WindmillTagEncodingV1.instance().stateTag(namespace1, tag1); sb.append("namespace2"); } }; - InternedByteString bytes = WindmillStateTagUtil.instance().encodeKey(namespace2, tag2); + InternedByteString bytes = WindmillTagEncodingV1.instance().stateTag(namespace2, tag2); assertEquals("namespace2+tag2", bytes.byteString().toStringUtf8()); } @@ -152,10 +152,10 @@ public void testTimerDataToFromTimer() { ? BoundedWindow.TIMESTAMP_MIN_VALUE : timer.getOutputTimestamp(); TimerData computed = - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .windmillTimerToTimerData( prefix, - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( stateFamily, prefix, timer, Timer.newBuilder()) .build(), @@ -205,10 +205,10 @@ public void testTimerDataToFromTimer() { expectedTimestamp, timer.getDomain()); assertThat( - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .windmillTimerToTimerData( prefix, - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( stateFamily, prefix, timer, Timer.newBuilder()) .build(), From ef14d3dcf798fe692c64aa8561f5f93c8e5e4c55 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 19 Dec 2025 13:24:23 +0000 Subject: [PATCH 2/8] Add WindmillTagEncodingV2. This adds a new way of encoding windmill state tags and timer tags. The new code is behind an unstable experiment. More work is needed before it can be used on real workloads. --- .../beam/runners/core/StateNamespaces.java | 8 + .../apache/beam/runners/core/StateTags.java | 17 + .../worker/StreamingModeExecutionContext.java | 12 +- .../worker/WindmillTimerInternals.java | 6 +- .../windmill/state/WindmillTagEncoding.java | 3 +- .../windmill/state/WindmillTagEncodingV1.java | 3 +- .../windmill/state/WindmillTagEncodingV2.java | 263 +++++++++ .../ComputationWorkExecutorFactory.java | 14 +- .../StreamingGroupAlsoByWindowFnsTest.java | 3 + .../StreamingModeExecutionContextTest.java | 3 +- .../worker/WorkerCustomSourcesTest.java | 6 +- .../state/WindmillStateInternalsTest.java | 10 +- .../state/WindmillTagEncodingV2Test.java | 514 ++++++++++++++++++ 13 files changed, 848 insertions(+), 14 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java index a68ab6c913ce..e919d12eaaca 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java @@ -102,6 +102,10 @@ public W getWindow() { return window; } + public Coder getWindowCoder() { + return windowCoder; + } + @Override public String stringKey() { try { @@ -170,6 +174,10 @@ public W getWindow() { return window; } + public Coder getWindowCoder() { + return windowCoder; + } + public int getTriggerIndex() { return triggerIndex; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index ba5478be6c77..5d69abe8ffce 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -144,6 +144,8 @@ private StateTags() {} private interface SystemStateTag { StateTag asKind(StateKind kind); + + StateKind getKind(); } /** Create a state tag for the given id and spec. */ @@ -243,6 +245,16 @@ public static StateTag makeSystemTagInternal( return typedTag.asKind(StateKind.SYSTEM); } + /* + * Returns true if the tag is a system internal tag. + */ + public static boolean isSystemTagInternal(StateTag tag) { + if (!(tag instanceof SystemStateTag)) { + return false; + } + return StateKind.SYSTEM.equals(((SystemStateTag) tag).getKind()); + } + public static StateTag> convertToBagTagInternal( StateTag> combiningTag) { return new SimpleStateTag<>( @@ -358,6 +370,11 @@ public StateTag asKind(StateKind kind) { return new SimpleStateTag<>(id.asKind(kind), spec); } + @Override + public StateKind getKind() { + return id.kind; + } + @Override public boolean equals(@Nullable Object other) { if (!(other instanceof SimpleStateTag)) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index d734d629711a..0c75e6d7ac1d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -59,10 +59,12 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForComputation; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; @@ -154,13 +156,14 @@ public StreamingModeExecutionContext( String computationId, ReaderCache readerCache, Map stateNameMap, - WindmillStateCache.ForComputation stateCache, + ForComputation stateCache, MetricsContainerRegistry metricsContainerRegistry, DataflowExecutionStateTracker executionStateTracker, StreamingModeExecutionStateRegistry executionStateRegistry, StreamingGlobalConfigHandle globalConfigHandle, long sinkByteLimit, - boolean throwExceptionOnLargeOutput) { + boolean throwExceptionOnLargeOutput, + boolean enableWindmillTagEncodingV2) { super( counterFactory, metricsContainerRegistry, @@ -171,7 +174,10 @@ public StreamingModeExecutionContext( this.readerCache = readerCache; this.globalConfigHandle = globalConfigHandle; this.sideInputCache = new HashMap<>(); - this.windmillTagEncoding = WindmillTagEncodingV1.instance(); + this.windmillTagEncoding = + enableWindmillTagEncodingV2 + ? WindmillTagEncodingV2.instance() + : WindmillTagEncodingV1.instance(); this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index cb41aa1ccab4..4287188c35bb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Setting a timer, clear any prior hold and set to the new value outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true) .addTimestamps( @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Clear the hold in case a previous iteration of this timer set one. outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true); } @@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // We are deleting timer; clear the hold outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag())) .setStateFamily(stateFamily) .setReset(true); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java index 59841f67347d..caef4b292d82 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java @@ -55,7 +55,8 @@ public abstract class WindmillTagEncoding { * Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark * hold that is only freed after the timer fires. */ - public abstract ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData); + public abstract ByteString timerHoldTag( + WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag); /** * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java index 19e31351a52b..08fb4c81df36 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java @@ -70,7 +70,8 @@ public InternedByteString stateTag(StateNamespace namespace, StateTag address /** {@inheritDoc} */ @Override - public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData) { + public ByteString timerHoldTag( + WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag) { String tagString; if ("".equals(timerData.getTimerFamilyId())) { tagString = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java new file mode 100644 index 000000000000..ba9998159177 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.io.InputStream; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; +import org.apache.beam.runners.core.StateNamespaces.WindowAndTriggerNamespace; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; +import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; +import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; +import org.joda.time.Instant; + +/** Encodes and decodes StateTags and TimerTags from and to windmill bytes */ +@Internal +@ThreadSafe +public class WindmillTagEncodingV2 extends WindmillTagEncoding { + + private static final WindmillTagEncodingV2 INSTANCE = new WindmillTagEncodingV2(); + + // Private constructor to prevent instantiations from outside. + private WindmillTagEncodingV2() {} + + /** {@inheritDoc} */ + @Override + public InternedByteString stateTag(StateNamespace namespace, StateTag address) { + try (StreamHandle streamHandle = ThreadLocalByteStringOutputStream.acquire()) { + ByteStringOutputStream stream = streamHandle.stream(); + encodeNameSpace(namespace, stream); + encodeAddress(address, stream); + return InternedByteString.of(stream.toByteStringAndReset()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override + public ByteString timerHoldTag( + WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag) { + // Same encoding for timer tag and timer hold tag. + // They are put in different places and won't collide. + return timerTag; + } + + /** {@inheritDoc} */ + @Override + public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { + try (StreamHandle streamHandle = ThreadLocalByteStringOutputStream.acquire()) { + ByteStringOutputStream stream = streamHandle.stream(); + encodeNameSpace(timerData.getNamespace(), stream); + if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) { + stream.write(0x03); // System namespace prefix + } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)) { + stream.write(0x04); // User namespace prefix + } else { + throw new IllegalStateException("Unexpected WindmillNamespacePrefix" + prefix); + } + StringUtf8Coder.of().encode(timerData.getTimerFamilyId(), stream); + StringUtf8Coder.of().encode(timerData.getTimerId(), stream); + return stream.toByteStringAndReset(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override + public TimerData windmillTimerToTimerData( + WindmillNamespacePrefix prefix, + Timer timer, + Coder windowCoder, + boolean draining) { + + InputStream stream = timer.getTag().newInput(); + + try { + StateNamespace stateNamespace = decodeNameSpace(stream, windowCoder); + int nextByte = stream.read(); + if (nextByte == 0x03) { // System namespace prefix + checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)); + } else if (nextByte == 0x04) { // User namespace prefix + checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)); + } else { + throw new IllegalStateException("Unexpected timer tag byte: " + nextByte); + } + + String timerFamilyId = StringUtf8Coder.of().decode(stream); + String timerId = StringUtf8Coder.of().decode(stream); + + Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp()); + Instant outputTimestamp = timestamp; + if (timer.hasMetadataTimestamp()) { + // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure + // to change the upper bound. + outputTimestamp = + WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp()); + if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) { + outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE; + } + } + + return TimerData.of( + timerId, + timerFamilyId, + stateNamespace, + timestamp, + outputTimestamp, + timerTypeToTimeDomain(timer.getType())); + + } catch (IOException e) { + throw new RuntimeException(e); + } + // todo add draining (https://github.com/apache/beam/issues/36884) + } + + /** @return the singleton WindmillStateTagUtil */ + public static WindmillTagEncodingV2 instance() { + return INSTANCE; + } + + private void encodeAddress(StateTag tag, ByteStringOutputStream stream) throws IOException { + if (StateTags.isSystemTagInternal(tag)) { + stream.write(0x01); // System tag + } else { + stream.write(0x02); // User tag + } + StringUtf8Coder.of().encode(tag.getId(), stream); + } + + private void encodeNameSpace(StateNamespace namespace, ByteStringOutputStream stream) + throws IOException { + if (namespace instanceof GlobalNamespace) { + // Single byte 0x01 for GlobalNamespace. + stream.write(0x01); + } else if (namespace instanceof WindowNamespace) { + // Single byte 0x10 for Non-Global namespace. + stream.write(0x10); + encodeWindowNamespace((WindowNamespace) namespace, stream); + } else if (namespace instanceof WindowAndTriggerNamespace) { + // Single byte 0x10 for Non-Global namespace. + stream.write(0x10); + encodeWindowAndTriggerNamespace( + (WindowAndTriggerNamespace) namespace, stream); + } else { + throw new IllegalStateException("Unsupported namespace type: " + namespace.getClass()); + } + } + + private StateNamespace decodeNameSpace( + InputStream stream, Coder windowCoder) throws IOException { + int firstByte = stream.read(); + switch (firstByte) { + case 0x01: // GlobalNamespace + return StateNamespaces.global(); + case 0x10: // Non-Global namespace + return decodeNonGlobalNamespace(stream, windowCoder); + default: + throw new IllegalStateException("Invalid first namespace byte: " + firstByte); + } + } + + private StateNamespace decodeNonGlobalNamespace( + InputStream stream, Coder windowCoder) throws IOException { + W window = decodeWindow(stream, windowCoder); + switch (stream.read()) { + case 0x1: // Window namespace + return StateNamespaces.window(windowCoder, window); + case 0x2: // Window and trigger namespace + Integer triggerIndex = BigEndianIntegerCoder.of().decode(stream); + return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); + default: + throw new IllegalStateException("Invalid trigger namespace byte: " + stream.read()); + } + } + + private W decodeWindow(InputStream stream, Coder windowCoder) + throws IOException { + int firstByte = stream.read(); + W window; + switch (firstByte) { + case 0x64: // IntervalWindow + window = (W) decodeIntervalWindow(stream); + break; + case 0x02: // Other Windows + window = windowCoder.decode(stream); + break; + default: + throw new IllegalStateException("Unexpected window first byte: " + firstByte); + } + return window; + } + + private IntervalWindow decodeIntervalWindow(InputStream stream) throws IOException { + Instant end = InstantCoder.of().decode(stream); + Instant start = InstantCoder.of().decode(stream); + return new IntervalWindow(start, end); + } + + private void encodeWindowNamespace( + WindowNamespace windowNamespace, ByteStringOutputStream stream) throws IOException { + encodeWindow(windowNamespace.getWindow(), windowNamespace.getWindowCoder(), stream); + stream.write(0x01); // Window namespace + } + + private void encodeWindowAndTriggerNamespace( + WindowAndTriggerNamespace windowAndTriggerNamespace, ByteStringOutputStream stream) + throws IOException { + encodeWindow( + windowAndTriggerNamespace.getWindow(), windowAndTriggerNamespace.getWindowCoder(), stream); + stream.write(0x02); // Window and trigger namespace + BigEndianIntegerCoder.of().encode(windowAndTriggerNamespace.getTriggerIndex(), stream); + } + + private void encodeWindow( + W window, Coder windowCoder, ByteStringOutputStream stream) throws IOException { + if (windowCoder instanceof IntervalWindowCoder) { + stream.write(0x64); // IntervalWindow + InstantCoder.of().encode(((IntervalWindow) window).end(), stream); + InstantCoder.of().encode(((IntervalWindow) window).start(), stream); + } else { + stream.write(0x02); // Other Windows + windowCoder.encode(window, stream); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java index 269799903300..097da87fb015 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java @@ -74,6 +74,14 @@ final class ComputationWorkExecutorFactory { private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT = "throw_exceptions_on_large_output"; + // Experiment to enable tag encoding v2. + // Experiment is for testing by dataflow runner developers. + // Related logic could change anytime without notice. + // **DO NOT USE** on real workloads. + // Enabling the experiment could lead to state incompatibilities and broken jobs. + private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT = + "unstable_windmill_tag_encoding_v2"; + private final DataflowWorkerHarnessOptions options; private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory; private final ReaderCache readerCache; @@ -97,6 +105,7 @@ final class ComputationWorkExecutorFactory { private final IdGenerator idGenerator; private final StreamingGlobalConfigHandle globalConfigHandle; private final boolean throwExceptionOnLargeOutput; + private final boolean enableWindmillTagEncodingV2; ComputationWorkExecutorFactory( DataflowWorkerHarnessOptions options, @@ -124,6 +133,8 @@ final class ComputationWorkExecutorFactory { : StreamingDataflowWorker.MAX_SINK_BYTES; this.throwExceptionOnLargeOutput = hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT); + this.enableWindmillTagEncodingV2 = + hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT); } private static Nodes.ParallelInstructionNode extractReadNode( @@ -268,7 +279,8 @@ private StreamingModeExecutionContext createExecutionContext( stageInfo.executionStateRegistry(), globalConfigHandle, maxSinkBytes, - throwExceptionOnLargeOutput); + throwExceptionOnLargeOutput, + enableWindmillTagEncodingV2); } private DataflowMapTaskExecutor createMapTaskExecutor( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 094623b81311..6992889469f0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -92,6 +92,7 @@ /** Unit tests for {@link StreamingGroupAlsoByWindowsDoFns}. */ @RunWith(JUnit4.class) public class StreamingGroupAlsoByWindowFnsTest { + private static final String KEY = "k"; private static final String STATE_FAMILY = "stateFamily"; private static final long WORK_TOKEN = 1000L; @@ -500,6 +501,7 @@ public void testSessions() throws Exception { } private final class TestStepContext implements StepContext { + private StateInternals stateInternals; private TestStepContext(String stepName) { @@ -522,6 +524,7 @@ public StateInternals stateInternals() { * CombineFn API properly. */ private static class SumLongs extends CombineFn { + @Override public Long createAccumulator() { return 0L; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 93b279f0aec5..8372b33d81c8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -133,7 +133,8 @@ public void setUp() { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false); + /*throwExceptionOnLargeOutput=*/ false, + /*enableWindmillTagEncodingV2=*/ false); } private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 334b9414b26b..f7364104f5db 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -618,7 +618,8 @@ public void testReadUnboundedReader() throws Exception { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false); + /*throwExceptionOnLargeOutput=*/ false, + /*enableWindmillTagEncodingV2=*/ false); options.setNumWorkers(5); int maxElements = 10; @@ -989,7 +990,8 @@ public void testFailedWorkItemsAbort() throws Exception { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false); + /*throwExceptionOnLargeOutput=*/ false, + /*enableWindmillTagEncodingV2=*/ false); options.setNumWorkers(5); int maxElements = 100; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index 7a06d3a29493..ae8a34e5ae33 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -112,6 +112,7 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) public class WindmillStateInternalsTest { + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); public static final Range FULL_ORDERED_LIST_RANGE = Range.closedOpen(WindmillOrderedList.MIN_TS_MICROS, WindmillOrderedList.MAX_TS_MICROS); @@ -3523,6 +3524,7 @@ private void forceCompactOnWrite() { } private static class MultimapEntryUpdate { + String key; Iterable values; boolean deleteAll; @@ -3535,8 +3537,12 @@ public MultimapEntryUpdate(String key, Iterable values, boolean deleteA @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof MultimapEntryUpdate)) return false; + if (this == o) { + return true; + } + if (!(o instanceof MultimapEntryUpdate)) { + return false; + } MultimapEntryUpdate that = (MultimapEntryUpdate) o; return deleteAll == that.deleteAll && Objects.equals(key, that.key) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java new file mode 100644 index 000000000000..0af699a844b6 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.state; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Enclosed.class) +public class WindmillTagEncodingV2Test { + + private static final IntervalWindow INTERVAL_WINDOW = + new IntervalWindow(new Instant(10), new Instant(20)); + + private static final CustomWindow CUSTOM_WINDOW = new CustomWindow(INTERVAL_WINDOW); + + private static final int TRIGGER_INDEX = 5; + + private static final StateNamespace GLOBAL_NAMESPACE = new GlobalNamespace(); + + private static final StateNamespace INTERVAL_WINDOW_NAMESPACE = + StateNamespaces.window(IntervalWindow.getCoder(), INTERVAL_WINDOW); + private static final StateNamespace INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE = + StateNamespaces.windowAndTrigger(IntervalWindow.getCoder(), INTERVAL_WINDOW, TRIGGER_INDEX); + + private static final StateNamespace OTHER_WINDOW_NAMESPACE = + StateNamespaces.window(new CustomWindow.CustomWindowCoder(), CUSTOM_WINDOW); + private static final StateNamespace OTHER_WINDOW_AND_TRIGGER_NAMESPACE = + StateNamespaces.windowAndTrigger( + new CustomWindow.CustomWindowCoder(), CUSTOM_WINDOW, TRIGGER_INDEX); + + // Generate a tag with length > 256, so length is encoded in two bytes. + private static final String TAG = + IntStream.of(300).mapToObj(i -> "a").collect(Collectors.joining()); + + private static final StateTag> USER_STATE_TAG = + StateTags.value(TAG, VarIntCoder.of()); + private static final StateTag> SYSTEM_STATE_TAG = + StateTags.makeSystemTagInternal(StateTags.value(TAG, VarIntCoder.of())); + + private static final ByteString TAG_BYTES = encode(StringUtf8Coder.of(), TAG); + + private static final ByteString SYSTEM_STATE_TAG_BYTES = + ByteString.copyFrom(new byte[] {1}) // system tag + .concat(TAG_BYTES); + private static final ByteString USER_STATE_TAG_BYTES = + ByteString.copyFrom(new byte[] {2}) // user tag + .concat(TAG_BYTES); + + private static final ByteString GLOBAL_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x1}); // global namespace + + private static final ByteString INTERVAL_WINDOW_BYTES = + ByteString.EMPTY + .concat(encode(InstantCoder.of(), INTERVAL_WINDOW.end())) + .concat(encode(InstantCoder.of(), INTERVAL_WINDOW.start())); + + private static final ByteString INTERVAL_WINDOW_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x10}) // non global namespace + .concat(ByteString.copyFrom(new byte[] {0x64})) // interval window + .concat(INTERVAL_WINDOW_BYTES) + .concat(ByteString.copyFrom(new byte[] {0x01})); // window namespace + + private static final ByteString INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x10}) // non global namespace + .concat(ByteString.copyFrom(new byte[] {0x64})) // interval window + .concat(INTERVAL_WINDOW_BYTES) + .concat(ByteString.copyFrom(new byte[] {0x02})) // window and trigger namespace + .concat( + ByteString.copyFrom(new byte[] {0x00, 0x00, 0x00, 0x05})); // big endian trigger index + + private static final ByteString OTHER_WINDOW_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x10}) // non global namespace + .concat(ByteString.copyFrom(new byte[] {0x02})) // non interval window + .concat(encode(new CustomWindow.CustomWindowCoder(), new CustomWindow(INTERVAL_WINDOW))) + .concat(ByteString.copyFrom(new byte[] {0x01})); // window namespace + + private static final ByteString OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES = + ByteString.copyFrom(new byte[] {0x10}) // non global namespace + .concat(ByteString.copyFrom(new byte[] {0x02})) // non interval window + .concat(encode(new CustomWindow.CustomWindowCoder(), new CustomWindow(INTERVAL_WINDOW))) + .concat(ByteString.copyFrom(new byte[] {0x02})) // window and trigger namespace + .concat( + ByteString.copyFrom(new byte[] {0x00, 0x00, 0x00, 0x05})); // big endian trigger index + + private static final String TIMER_FAMILY_ID = "timerFamily"; + private static final ByteString TIMER_FAMILY_ID_BYTES = + encode(StringUtf8Coder.of(), TIMER_FAMILY_ID); + + private static final String TIMER_ID = "timerId"; + private static final ByteString TIMER_ID_BYTES = encode(StringUtf8Coder.of(), TIMER_ID); + + private static final ByteString SYSTEM_TIMER_BYTES = + ByteString.copyFrom(new byte[] {0x3}) // system timer + .concat(TIMER_FAMILY_ID_BYTES) + .concat(TIMER_ID_BYTES); + + private static final ByteString USER_TIMER_BYTES = + ByteString.copyFrom(new byte[] {0x4}) // user timer + .concat(TIMER_FAMILY_ID_BYTES) + .concat(TIMER_ID_BYTES); + + private static final ByteString SYSTEM_TIMER_BYTES_NO_FAMILY_ID = + ByteString.copyFrom(new byte[] {0x3}) // system timer + .concat(encode(StringUtf8Coder.of(), "")) + .concat(TIMER_ID_BYTES); + + private static final ByteString USER_TIMER_BYTES_NO_FAMILY_ID = + ByteString.copyFrom(new byte[] {0x4}) // user timer + .concat(encode(StringUtf8Coder.of(), "")) + .concat(TIMER_ID_BYTES); + + @RunWith(Parameterized.class) + public static class EncodeStateTagTest { + @Parameters(name = "{index}: namespace={0} stateTag={1} expectedBytes={2}") + public static Collection data() { + return ImmutableList.of( + new Object[] { + GLOBAL_NAMESPACE, USER_STATE_TAG, GLOBAL_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }, + new Object[] { + GLOBAL_NAMESPACE, + SYSTEM_STATE_TAG, + GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_STATE_TAG_BYTES) + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + USER_STATE_TAG, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + USER_STATE_TAG, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + USER_STATE_TAG, + OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + USER_STATE_TAG, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES) + }); + } + + @Parameter(0) + public StateNamespace namespace; + + @Parameter(1) + public StateTag stateTag; + + @Parameter(2) + public ByteString expectedBytes; + + @Test + public void testStateTag() { + assertEquals( + expectedBytes, + WindmillTagEncodingV2.instance().stateTag(namespace, stateTag).byteString()); + } + } + + @RunWith(Parameterized.class) + public static class TimerTagTest { + @Parameters( + name = + "{index}: namespace={0} prefix={1} expectedBytes={2} includeTimerId={3}" + + " includeTimerFamilyId={4} timeDomain={4}") + public static Collection data() { + List data = new ArrayList<>(); + for (boolean includeTimerFamilyId : ImmutableList.of(true, false)) { + ByteString expectedSystemTimerBytes = + includeTimerFamilyId ? SYSTEM_TIMER_BYTES : SYSTEM_TIMER_BYTES_NO_FAMILY_ID; + ByteString expectedUserTimerBytes = + includeTimerFamilyId ? USER_TIMER_BYTES : USER_TIMER_BYTES_NO_FAMILY_ID; + List tests = + ImmutableList.of( + new Object[] { + GLOBAL_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + GLOBAL_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + GLOBAL_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + GLOBAL_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes) + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes) + }); + + for (Object[] params : tests) { + for (TimeDomain timeDomain : TimeDomain.values()) { + data.add( + new Object[] {params[0], params[1], params[2], includeTimerFamilyId, timeDomain}); + } + } + } + return data; + } + + @Parameter(0) + public StateNamespace namespace; + + @Parameter(1) + public WindmillNamespacePrefix prefix; + + @Parameter(2) + public ByteString expectedBytes; + + @Parameter(3) + public boolean includeTimerFamilyId; + + @Parameter(4) + public TimeDomain timeDomain; + + @Test + public void testTimerTag() { + TimerData timerData = + includeTimerFamilyId + ? TimerData.of( + TIMER_ID, + TIMER_FAMILY_ID, + namespace, + new Instant(123), + new Instant(456), + timeDomain) + : TimerData.of(TIMER_ID, namespace, new Instant(123), new Instant(456), timeDomain); + assertEquals(expectedBytes, WindmillTagEncodingV2.instance().timerTag(prefix, timerData)); + } + } + + @RunWith(Parameterized.class) + public static class TimerDataFromTimerTest { + @Parameters(name = "{index}: namespace={0} prefix={1} draining={4} timeDomain={5}") + public static Collection data() { + List tests = + ImmutableList.of( + new Object[] { + GLOBAL_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + GLOBAL_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + GlobalWindow.Coder.INSTANCE + }, + new Object[] { + GLOBAL_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + GlobalWindow.Coder.INSTANCE + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + IntervalWindow.getCoder() + }, + new Object[] { + INTERVAL_WINDOW_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + INTERVAL_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + IntervalWindow.getCoder() + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + new CustomWindow.CustomWindowCoder() + }, + new Object[] { + OTHER_WINDOW_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + OTHER_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + new CustomWindow.CustomWindowCoder() + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + IntervalWindow.getCoder() + }, + new Object[] { + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + IntervalWindow.getCoder() + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES), + new CustomWindow.CustomWindowCoder() + }, + new Object[] { + OTHER_WINDOW_AND_TRIGGER_NAMESPACE, + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES), + new CustomWindow.CustomWindowCoder() + }); + + List data = new ArrayList<>(); + for (Object[] params : tests) { + for (boolean draining : ImmutableList.of(true, false)) { + for (TimeDomain timeDomain : TimeDomain.values()) { + data.add( + new Object[] {params[0], params[1], params[2], params[3], draining, timeDomain}); + } + } + } + return data; + } + + @Parameter(0) + public StateNamespace namespace; + + @Parameter(1) + public WindmillNamespacePrefix prefix; + + @Parameter(2) + public ByteString timerTag; + + @Parameter(3) + public Coder windowCoder; + + @Parameter(4) + public boolean draining; + + @Parameter(5) + public TimeDomain timeDomain; + + @Test + public void testTimerDataFromTimer() { + WindmillTagEncodingV2 encoding = WindmillTagEncodingV2.instance(); + Instant timestamp = Instant.now(); + Instant outputTimestamp = timestamp.plus(Duration.standardSeconds(1)); + TimerData timerData = + TimerData.of( + TIMER_ID, TIMER_FAMILY_ID, namespace, timestamp, outputTimestamp, timeDomain); + Timer timer = + Timer.newBuilder() + .setTag(timerTag) + .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timestamp)) + .setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)) + .setType(timerType(timeDomain)) + .build(); + assertEquals( + timerData, encoding.windmillTimerToTimerData(prefix, timer, windowCoder, draining)); + } + } + + @RunWith(JUnit4.class) + public static class TimerHoldTagTest { + @Test + public void testTimerHoldTagUsesTimerTag() { + TimerData timerData = + TimerData.of( + TIMER_ID, + TIMER_FAMILY_ID, + GLOBAL_NAMESPACE, + new Instant(123), + new Instant(456), + TimeDomain.EVENT_TIME); + byte[] bytes = new byte[16]; + ThreadLocalRandom.current().nextBytes(bytes); + ByteString timerTag = ByteString.copyFrom(bytes); + assertEquals( + WindmillTagEncodingV2.instance() + .timerHoldTag(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timerData, timerTag), + timerTag); + } + } + + private static class CustomWindow extends IntervalWindow { + private CustomWindow(IntervalWindow intervalWindow) { + super(intervalWindow.start(), intervalWindow.end()); + } + + private static class CustomWindowCoder extends Coder { + + @Override + public void verifyDeterministic() throws NonDeterministicException { + IntervalWindowCoder.of().verifyDeterministic(); + } + + @Override + public List> getCoderArguments() { + return IntervalWindowCoder.of().getCoderArguments(); + } + + @Override + public void encode(CustomWindow value, OutputStream outStream) throws IOException { + IntervalWindowCoder.of().encode(value, outStream); + } + + @Override + public CustomWindow decode(InputStream inStream) throws IOException { + return new CustomWindow(IntervalWindowCoder.of().decode(inStream)); + } + } + } + + private static ByteString encode(Coder coder, T value) { + try { + ByteString.Output out = ByteString.newOutput(); + coder.encode(value, out); + return out.toByteString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static Timer.Type timerType(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return Timer.Type.WATERMARK; + case PROCESSING_TIME: + return Timer.Type.REALTIME; + case SYNCHRONIZED_PROCESSING_TIME: + return Timer.Type.DEPENDENT_REALTIME; + default: + throw new IllegalArgumentException("Unrecognized TimeDomain: " + domain); + } + } +} From 155b3a0d32d8e4670339f95adb354e94ee796087 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 19 Dec 2025 13:38:07 +0000 Subject: [PATCH 3/8] Fix stream.read on exception path --- .../worker/windmill/state/WindmillTagEncodingV2.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java index ba9998159177..502f24524c61 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java @@ -200,14 +200,15 @@ private StateNamespace decodeNameSpace( private StateNamespace decodeNonGlobalNamespace( InputStream stream, Coder windowCoder) throws IOException { W window = decodeWindow(stream, windowCoder); - switch (stream.read()) { + int namespaceByte = stream.read(); + switch (namespaceByte) { case 0x1: // Window namespace return StateNamespaces.window(windowCoder, window); case 0x2: // Window and trigger namespace Integer triggerIndex = BigEndianIntegerCoder.of().decode(stream); return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); default: - throw new IllegalStateException("Invalid trigger namespace byte: " + stream.read()); + throw new IllegalStateException("Invalid trigger namespace byte: " + namespaceByte); } } From 18a7494d38b3bc3050743e3babb824946a4a63a1 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 19 Dec 2025 13:52:12 +0000 Subject: [PATCH 4/8] Name magic numbers --- .../windmill/state/WindmillTagEncodingV2.java | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java index 502f24524c61..0cd177d1154e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java @@ -54,6 +54,16 @@ public class WindmillTagEncodingV2 extends WindmillTagEncoding { private static final WindmillTagEncodingV2 INSTANCE = new WindmillTagEncodingV2(); + private static final int WINDOW_NAMESPACE_BYTE = 0x01; + private static final int WINDOW_AND_TRIGGER_NAMESPACE_BYTE = 0x02; + private static final int NON_GLOBAL_NAMESPACE_BYTE = 0x10; + private static final int GLOBAL_NAMESPACE_BYTE = 0x01; + private static final int SYSTEM_STATE_TAG_BYTE = 0x01; + private static final int USER_STATE_TAG_BYTE = 0x02; + private static final int SYSTEM_TIMER_BYTE = 0x03; + private static final int USER_TIMER_BYTE = 0x04; + private static final int INTERVAL_WINDOW_BYTE = 0x64; + private static final int OTHER_WINDOW_BYTE = 0x02; // Private constructor to prevent instantiations from outside. private WindmillTagEncodingV2() {} @@ -87,9 +97,9 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) ByteStringOutputStream stream = streamHandle.stream(); encodeNameSpace(timerData.getNamespace(), stream); if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) { - stream.write(0x03); // System namespace prefix + stream.write(SYSTEM_TIMER_BYTE); } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)) { - stream.write(0x04); // User namespace prefix + stream.write(USER_TIMER_BYTE); } else { throw new IllegalStateException("Unexpected WindmillNamespacePrefix" + prefix); } @@ -114,9 +124,9 @@ public TimerData windmillTimerToTimerData( try { StateNamespace stateNamespace = decodeNameSpace(stream, windowCoder); int nextByte = stream.read(); - if (nextByte == 0x03) { // System namespace prefix + if (nextByte == SYSTEM_TIMER_BYTE) { checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)); - } else if (nextByte == 0x04) { // User namespace prefix + } else if (nextByte == USER_TIMER_BYTE) { checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)); } else { throw new IllegalStateException("Unexpected timer tag byte: " + nextByte); @@ -158,9 +168,9 @@ public static WindmillTagEncodingV2 instance() { private void encodeAddress(StateTag tag, ByteStringOutputStream stream) throws IOException { if (StateTags.isSystemTagInternal(tag)) { - stream.write(0x01); // System tag + stream.write(SYSTEM_STATE_TAG_BYTE); // System tag } else { - stream.write(0x02); // User tag + stream.write(USER_STATE_TAG_BYTE); // User tag } StringUtf8Coder.of().encode(tag.getId(), stream); } @@ -168,15 +178,12 @@ private void encodeAddress(StateTag tag, ByteStringOutputStream stream) throw private void encodeNameSpace(StateNamespace namespace, ByteStringOutputStream stream) throws IOException { if (namespace instanceof GlobalNamespace) { - // Single byte 0x01 for GlobalNamespace. - stream.write(0x01); + stream.write(GLOBAL_NAMESPACE_BYTE); } else if (namespace instanceof WindowNamespace) { - // Single byte 0x10 for Non-Global namespace. - stream.write(0x10); + stream.write(NON_GLOBAL_NAMESPACE_BYTE); encodeWindowNamespace((WindowNamespace) namespace, stream); } else if (namespace instanceof WindowAndTriggerNamespace) { - // Single byte 0x10 for Non-Global namespace. - stream.write(0x10); + stream.write(NON_GLOBAL_NAMESPACE_BYTE); encodeWindowAndTriggerNamespace( (WindowAndTriggerNamespace) namespace, stream); } else { @@ -188,9 +195,9 @@ private StateNamespace decodeNameSpace( InputStream stream, Coder windowCoder) throws IOException { int firstByte = stream.read(); switch (firstByte) { - case 0x01: // GlobalNamespace + case GLOBAL_NAMESPACE_BYTE: // GlobalNamespace return StateNamespaces.global(); - case 0x10: // Non-Global namespace + case NON_GLOBAL_NAMESPACE_BYTE: // Non-Global namespace return decodeNonGlobalNamespace(stream, windowCoder); default: throw new IllegalStateException("Invalid first namespace byte: " + firstByte); @@ -202,9 +209,9 @@ private StateNamespace decodeNonGlobalNamespace( W window = decodeWindow(stream, windowCoder); int namespaceByte = stream.read(); switch (namespaceByte) { - case 0x1: // Window namespace + case WINDOW_NAMESPACE_BYTE: // Window namespace return StateNamespaces.window(windowCoder, window); - case 0x2: // Window and trigger namespace + case WINDOW_AND_TRIGGER_NAMESPACE_BYTE: // Window and trigger namespace Integer triggerIndex = BigEndianIntegerCoder.of().decode(stream); return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); default: @@ -217,10 +224,10 @@ private W decodeWindow(InputStream stream, Coder wi int firstByte = stream.read(); W window; switch (firstByte) { - case 0x64: // IntervalWindow + case INTERVAL_WINDOW_BYTE: window = (W) decodeIntervalWindow(stream); break; - case 0x02: // Other Windows + case OTHER_WINDOW_BYTE: window = windowCoder.decode(stream); break; default: @@ -238,7 +245,7 @@ private IntervalWindow decodeIntervalWindow(InputStream stream) throws IOExcepti private void encodeWindowNamespace( WindowNamespace windowNamespace, ByteStringOutputStream stream) throws IOException { encodeWindow(windowNamespace.getWindow(), windowNamespace.getWindowCoder(), stream); - stream.write(0x01); // Window namespace + stream.write(WINDOW_NAMESPACE_BYTE); } private void encodeWindowAndTriggerNamespace( @@ -246,18 +253,18 @@ private void encodeWindowAndTriggerNamespace( throws IOException { encodeWindow( windowAndTriggerNamespace.getWindow(), windowAndTriggerNamespace.getWindowCoder(), stream); - stream.write(0x02); // Window and trigger namespace + stream.write(WINDOW_AND_TRIGGER_NAMESPACE_BYTE); BigEndianIntegerCoder.of().encode(windowAndTriggerNamespace.getTriggerIndex(), stream); } private void encodeWindow( W window, Coder windowCoder, ByteStringOutputStream stream) throws IOException { if (windowCoder instanceof IntervalWindowCoder) { - stream.write(0x64); // IntervalWindow + stream.write(INTERVAL_WINDOW_BYTE); InstantCoder.of().encode(((IntervalWindow) window).end(), stream); InstantCoder.of().encode(((IntervalWindow) window).start(), stream); } else { - stream.write(0x02); // Other Windows + stream.write(OTHER_WINDOW_BYTE); windowCoder.encode(window, stream); } } From c88809d4e7524fc308b5c34a9a0afa5b065c7a89 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sun, 21 Dec 2025 13:31:38 +0000 Subject: [PATCH 5/8] Address comments --- .../windmill/state/WindmillTagEncodingV2.java | 149 +++++++++++++++++- .../state/WindmillTagEncodingV2Test.java | 59 +++++++ 2 files changed, 201 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java index 0cd177d1154e..7d11a419ceef 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java @@ -48,7 +48,142 @@ import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.joda.time.Instant; -/** Encodes and decodes StateTags and TimerTags from and to windmill bytes */ +/** + * Encodes and decodes StateTags and TimerTags from and to windmill bytes. This encoding scheme + * enforces a specific lexicographical order on state tags. The ordering enables building range + * filters using the tags. + * + *

1. High-Level Tag Formats

+ * + *

State tags and Timer tags differ in structure but share common component encodings. + * + *

1.1 State Tag Encoding

+ * + *

Used for generic state variables (e.g., ValueState, BagState, etc). + * + *

+ * Format:
+ * | Encoded Namespace | Encoded Address |
+ * 
+ * + *
    + *
  • Encoded Namespace: Encodes the state namespace (see Section 2.1). + *
  • Encoded Address: Encodes the state variable address (see Section 2.3). + *
+ * + *

1.2 Timer/Timer Hold Tag Encoding

+ * + *

Specialized tags, used for timers and automatic watermark holds associated with the timers. + * + *

+ * Format:
+ * | Encoded Namespace | Tag Type | Timer Family Id | Timer Id |
+ *
+ * +-------------------+-----------------------------------------------------------+
+ * | Field             | Format                                                    |
+ * +-------------------+-----------------------------------------------------------+
+ * | Encoded Namespace | Encoded namespace (see Section 2.1).                      |
+ * +-------------------+-----------------------------------------------------------+
+ * | Tag Type          | {@code 0x03} (Single byte): System Timer/Watermark Hold   |
+ * |                   | {@code 0x04} (Single byte): User Timer/Watermark Hold     |
+ * +-------------------+-----------------------------------------------------------+
+ * | Timer Family ID   | TimerFamilyId encoded via length prefixed                 |
+ * |                   | {@code StringUtf8Coder}.                                  |
+ * +-------------------+-----------------------------------------------------------+
+ * | Timer ID          | TimerId encoded via length prefixed                       |
+ * |                   | {@code StringUtf8Coder}.                                  |
+ * +-------------------+-----------------------------------------------------------+
+ * 
+ * + *

2. Component Encodings

+ * + *

2.1 Namespace Encoding

+ * + *

Namespaces are prefixed with a byte ID to control sorting order. + * + *

+ * +---------------------------+-------------------------------------------------------------+
+ * | Namespace Type            | Format                                                      |
+ * +---------------------------+-------------------------------------------------------------+
+ * | GlobalNamespace           | | {@code 0x01} |                                            |
+ * |                           | (Single byte)                                               |
+ * +---------------------------+-------------------------------------------------------------+
+ * | WindowNamespace           | | {@code 0x10} | Encoded Window | {@code 0x01} |            |
+ * |                           | (See Section 2.2)                                           |
+ * +---------------------------+-------------------------------------------------------------+
+ * | WindowAndTriggerNamespace | | {@code 0x10} | Encoded Window | {@code 0x02} | TriggerIndex |
+ * |                           | (See Section 2.2 for Encoded Window)                        |
+ * |                           | TriggerIndex is encoded by {@code BigEndianIntegerCoder}    |
+ * +---------------------------+-------------------------------------------------------------+
+ * 
+ * + *

2.2 Window Encoding

+ * + *

2.2.1 IntervalWindow

+ * + *

IntervalWindows use a custom encoding that is different from the IntervalWindowCoder. + * + *

+ * Format:
+ * | 0x64 | End Time | Start Time |
+ * 
+ * + *
    + *
  • Prefix: {@code 0x64}. Single byte identifying Interval windows. + *
  • End Time: {@code intervalWindow.end()} encoded via {@code InstantCoder}. + *
  • Start Time: {@code intervalWindow.start()} encoded via {@code InstantCoder}. + *
+ * + *

Note: {@code InstantCoder} preserves the sort order. The encoded IntervalWindow is to + * be sorted based on {@code [End Time, Start Time]} directly without needing to decode. + * + *

2.2.2 Other Windows

+ * + *

All non-IntervalWindows use the standard window coders. + * + *

+ * Format:
+ * | 0x02 | Window |
+ * 
+ * + *
    + *
  • Prefix: {@code 0x02}. Single byte identifying non-Interval windows. + *
  • Window: The window serialized using its {@code windowCoder}. + *
+ * + *

2.3 Address Encoding

+ * + *

Combines the state type and the state identifier. + * + *

+ * Format:
+ * | State Type | Address |
+ *
+ * +------------+-----------------------------------------------------------------+
+ * | Field      | Format                                                          |
+ * +------------+-----------------------------------------------------------------+
+ * | State Type | {@code 0x01} (Single byte): System State                        |
+ * |            | {@code 0x02} (Single byte): User State                          |
+ * +------------+-----------------------------------------------------------------+
+ * | Address    | The state address (string) is encoded via length prefixed       |
+ * |            | {@code StringUtf8Coder}.                                        |
+ * +------------+-----------------------------------------------------------------+
+ * 
+ * + *

3. Tag Ordering

+ * + *

The encoding prefixes are chosen to enforce the following lexicographical sort order (lowest + * to highest): + * + *

    + *
  1. Tags in Global Namespace (Prefix {@code 0x01}) + *
  2. Tags in Non-Interval Windows (Prefix {@code 0x1002}) + *
  3. Tags in Interval Windows (Prefix {@code 0x1064}) + *
      + *
    • Sorted internally by {@code [EndTime, StartTime]}. + *
    + *
+ */ @Internal @ThreadSafe public class WindmillTagEncodingV2 extends WindmillTagEncoding { @@ -168,9 +303,9 @@ public static WindmillTagEncodingV2 instance() { private void encodeAddress(StateTag tag, ByteStringOutputStream stream) throws IOException { if (StateTags.isSystemTagInternal(tag)) { - stream.write(SYSTEM_STATE_TAG_BYTE); // System tag + stream.write(SYSTEM_STATE_TAG_BYTE); } else { - stream.write(USER_STATE_TAG_BYTE); // User tag + stream.write(USER_STATE_TAG_BYTE); } StringUtf8Coder.of().encode(tag.getId(), stream); } @@ -195,9 +330,9 @@ private StateNamespace decodeNameSpace( InputStream stream, Coder windowCoder) throws IOException { int firstByte = stream.read(); switch (firstByte) { - case GLOBAL_NAMESPACE_BYTE: // GlobalNamespace + case GLOBAL_NAMESPACE_BYTE: return StateNamespaces.global(); - case NON_GLOBAL_NAMESPACE_BYTE: // Non-Global namespace + case NON_GLOBAL_NAMESPACE_BYTE: return decodeNonGlobalNamespace(stream, windowCoder); default: throw new IllegalStateException("Invalid first namespace byte: " + firstByte); @@ -209,9 +344,9 @@ private StateNamespace decodeNonGlobalNamespace( W window = decodeWindow(stream, windowCoder); int namespaceByte = stream.read(); switch (namespaceByte) { - case WINDOW_NAMESPACE_BYTE: // Window namespace + case WINDOW_NAMESPACE_BYTE: return StateNamespaces.window(windowCoder, window); - case WINDOW_AND_TRIGGER_NAMESPACE_BYTE: // Window and trigger namespace + case WINDOW_AND_TRIGGER_NAMESPACE_BYTE: Integer triggerIndex = BigEndianIntegerCoder.of().decode(stream); return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); default: diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java index 0af699a844b6..0e6534bd3d9c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.windmill.state; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.InputStream; @@ -164,6 +165,7 @@ public class WindmillTagEncodingV2Test { @RunWith(Parameterized.class) public static class EncodeStateTagTest { + @Parameters(name = "{index}: namespace={0} stateTag={1} expectedBytes={2}") public static Collection data() { return ImmutableList.of( @@ -216,6 +218,7 @@ public void testStateTag() { @RunWith(Parameterized.class) public static class TimerTagTest { + @Parameters( name = "{index}: namespace={0} prefix={1} expectedBytes={2} includeTimerId={3}" @@ -323,6 +326,7 @@ public void testTimerTag() { @RunWith(Parameterized.class) public static class TimerDataFromTimerTest { + @Parameters(name = "{index}: namespace={0} prefix={1} draining={4} timeDomain={5}") public static Collection data() { List tests = @@ -440,6 +444,7 @@ public void testTimerDataFromTimer() { @RunWith(JUnit4.class) public static class TimerHoldTagTest { + @Test public void testTimerHoldTagUsesTimerTag() { TimerData timerData = @@ -460,7 +465,61 @@ public void testTimerHoldTagUsesTimerTag() { } } + @RunWith(JUnit4.class) + public static class SortOrderTest { + + @Test + public void testSortOrder() { + WindmillTagEncodingV2 encoding = WindmillTagEncodingV2.instance(); + + Instant baseInstant = Instant.now(); + // [5, 20) + StateNamespace interval5_20 = + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(baseInstant.plus(5), baseInstant.plus(20))); + // [10, 20) + StateNamespace interval10_20 = + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(baseInstant.plus(10), baseInstant.plus(20))); + // [20, 30) + StateNamespace interval20_30 = + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(baseInstant.plus(20), baseInstant.plus(30))); + + ByteString globalBytes = encoding.stateTag(GLOBAL_NAMESPACE, USER_STATE_TAG).byteString(); + ByteString otherWindowBytes = + encoding.stateTag(OTHER_WINDOW_NAMESPACE, USER_STATE_TAG).byteString(); + ByteString interval5_20Bytes = encoding.stateTag(interval5_20, USER_STATE_TAG).byteString(); + ByteString interval10_20Bytes = encoding.stateTag(interval10_20, USER_STATE_TAG).byteString(); + ByteString interval20_30Bytes = encoding.stateTag(interval20_30, USER_STATE_TAG).byteString(); + + // Global < Non-Interval < Interval + assertOrdered(globalBytes, otherWindowBytes); + assertOrdered(otherWindowBytes, interval5_20Bytes); + + // Interval sorting: EndTime then StartTime + // [5, 20) < [10, 20) (Same End=20, Start 5 < 10) + assertOrdered(interval5_20Bytes, interval10_20Bytes); + // [10, 20) < [20, 30) (End 20 < 30) + assertOrdered(interval10_20Bytes, interval20_30Bytes); + + assertTrue(globalBytes.startsWith(ByteString.copyFrom(new byte[] {0x01}))); + assertTrue(otherWindowBytes.startsWith(ByteString.copyFrom(new byte[] {0x10, 0x02}))); + assertTrue(interval5_20Bytes.startsWith(ByteString.copyFrom(new byte[] {0x10, 0x64}))); + assertTrue(interval10_20Bytes.startsWith(ByteString.copyFrom(new byte[] {0x10, 0x64}))); + assertTrue(interval20_30Bytes.startsWith(ByteString.copyFrom(new byte[] {0x10, 0x64}))); + } + + private void assertOrdered(ByteString smaller, ByteString larger) { + assertTrue(ByteString.unsignedLexicographicalComparator().compare(smaller, larger) < 0); + } + } + private static class CustomWindow extends IntervalWindow { + private CustomWindow(IntervalWindow intervalWindow) { super(intervalWindow.start(), intervalWindow.end()); } From ab51e97e6d798422b7705340102cd318942dafd2 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sun, 21 Dec 2025 13:46:44 +0000 Subject: [PATCH 6/8] Cleanup unrelated diff --- .../worker/StreamingGroupAlsoByWindowFnsTest.java | 3 --- .../windmill/state/WindmillStateInternalsTest.java | 10 ++-------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 6992889469f0..094623b81311 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -92,7 +92,6 @@ /** Unit tests for {@link StreamingGroupAlsoByWindowsDoFns}. */ @RunWith(JUnit4.class) public class StreamingGroupAlsoByWindowFnsTest { - private static final String KEY = "k"; private static final String STATE_FAMILY = "stateFamily"; private static final long WORK_TOKEN = 1000L; @@ -501,7 +500,6 @@ public void testSessions() throws Exception { } private final class TestStepContext implements StepContext { - private StateInternals stateInternals; private TestStepContext(String stepName) { @@ -524,7 +522,6 @@ public StateInternals stateInternals() { * CombineFn API properly. */ private static class SumLongs extends CombineFn { - @Override public Long createAccumulator() { return 0L; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index ae8a34e5ae33..7a06d3a29493 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -112,7 +112,6 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) public class WindmillStateInternalsTest { - @Rule public transient Timeout globalTimeout = Timeout.seconds(600); public static final Range FULL_ORDERED_LIST_RANGE = Range.closedOpen(WindmillOrderedList.MIN_TS_MICROS, WindmillOrderedList.MAX_TS_MICROS); @@ -3524,7 +3523,6 @@ private void forceCompactOnWrite() { } private static class MultimapEntryUpdate { - String key; Iterable values; boolean deleteAll; @@ -3537,12 +3535,8 @@ public MultimapEntryUpdate(String key, Iterable values, boolean deleteA @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof MultimapEntryUpdate)) { - return false; - } + if (this == o) return true; + if (!(o instanceof MultimapEntryUpdate)) return false; MultimapEntryUpdate that = (MultimapEntryUpdate) o; return deleteAll == that.deleteAll && Objects.equals(key, that.key) From 311ca25c438414d514ce2f1a17e65b2c638790a9 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sun, 21 Dec 2025 13:50:53 +0000 Subject: [PATCH 7/8] doc fix --- .../dataflow/worker/windmill/state/WindmillTagEncoding.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java index caef4b292d82..a979a1d982c4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java @@ -54,6 +54,8 @@ public abstract class WindmillTagEncoding { /** * Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark * hold that is only freed after the timer fires. + * + * @param timerTag tag of the timer that maps to the hold. */ public abstract ByteString timerHoldTag( WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag); From 724a6effde65f75cd3875bfff9b6ac90e7d9f087 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sun, 21 Dec 2025 14:42:07 +0000 Subject: [PATCH 8/8] spotless fix --- .../worker/windmill/state/WindmillTagEncodingV2Test.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java index 0e6534bd3d9c..af9ef95410d1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java @@ -477,17 +477,20 @@ public void testSortOrder() { StateNamespace interval5_20 = StateNamespaces.window( IntervalWindow.getCoder(), - new IntervalWindow(baseInstant.plus(5), baseInstant.plus(20))); + new IntervalWindow( + baseInstant.plus(Duration.millis(5)), baseInstant.plus(Duration.millis(20)))); // [10, 20) StateNamespace interval10_20 = StateNamespaces.window( IntervalWindow.getCoder(), - new IntervalWindow(baseInstant.plus(10), baseInstant.plus(20))); + new IntervalWindow( + baseInstant.plus(Duration.millis(10)), baseInstant.plus(Duration.millis(20)))); // [20, 30) StateNamespace interval20_30 = StateNamespaces.window( IntervalWindow.getCoder(), - new IntervalWindow(baseInstant.plus(20), baseInstant.plus(30))); + new IntervalWindow( + baseInstant.plus(Duration.millis(20)), baseInstant.plus(Duration.millis(30)))); ByteString globalBytes = encoding.stateTag(GLOBAL_NAMESPACE, USER_STATE_TAG).byteString(); ByteString otherWindowBytes =