Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public W getWindow() {
return window;
}

public Coder<W> getWindowCoder() {
return windowCoder;
}

@Override
public String stringKey() {
try {
Expand Down Expand Up @@ -170,6 +174,10 @@ public W getWindow() {
return window;
}

public Coder<W> getWindowCoder() {
return windowCoder;
}

public int getTriggerIndex() {
return triggerIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ private StateTags() {}

private interface SystemStateTag<StateT extends State> {
StateTag<StateT> asKind(StateKind kind);

StateKind getKind();
}

/** Create a state tag for the given id and spec. */
Expand Down Expand Up @@ -243,6 +245,16 @@ public static <StateT extends State> StateTag<StateT> makeSystemTagInternal(
return typedTag.asKind(StateKind.SYSTEM);
}

/*
* Returns true if the tag is a system internal tag.
*/
public static <StateT extends State> boolean isSystemTagInternal(StateTag<StateT> tag) {
if (!(tag instanceof SystemStateTag)) {
return false;
}
return StateKind.SYSTEM.equals(((SystemStateTag<?>) tag).getKind());
}

public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>> convertToBagTagInternal(
StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
return new SimpleStateTag<>(
Expand Down Expand Up @@ -358,6 +370,11 @@ public StateTag<StateT> asKind(StateKind kind) {
return new SimpleStateTag<>(id.asKind(kind), spec);
}

@Override
public StateKind getKind() {
return id.kind;
}

@Override
public boolean equals(@Nullable Object other) {
if (!(other instanceof SimpleStateTag)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForComputation;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -154,13 +156,14 @@ public StreamingModeExecutionContext(
String computationId,
ReaderCache readerCache,
Map<String, String> stateNameMap,
WindmillStateCache.ForComputation stateCache,
ForComputation stateCache,
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
boolean throwExceptionOnLargeOutput,
boolean enableWindmillTagEncodingV2) {
super(
counterFactory,
metricsContainerRegistry,
Expand All @@ -171,7 +174,10 @@ public StreamingModeExecutionContext(
this.readerCache = readerCache;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
this.windmillTagEncoding = WindmillTagEncodingV1.instance();
this.windmillTagEncoding =
enableWindmillTagEncodingV2
? WindmillTagEncodingV2.instance()
: WindmillTagEncodingV1.instance();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Setting a timer, clear any prior hold and set to the new value
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true)
.addTimestamps(
Expand All @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Clear the hold in case a previous iteration of this timer set one.
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand All @@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// We are deleting timer; clear the hold
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ public abstract class WindmillTagEncoding {
/**
* Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark
* hold that is only freed after the timer fires.
*
* @param timerTag tag of the timer that maps to the hold.
*/
public abstract ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData);
public abstract ByteString timerHoldTag(
WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag);

/**
* Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public InternedByteString stateTag(StateNamespace namespace, StateTag<?> address

/** {@inheritDoc} */
@Override
public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData) {
public ByteString timerHoldTag(
WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag) {
String tagString;
if ("".equals(timerData.getTimerFamilyId())) {
tagString =
Expand Down
Loading
Loading