From 437cd3c59232e9f9e599fb7455fb49e76c00d41a Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 2 Dec 2025 15:55:37 +0100 Subject: [PATCH 1/2] [OpenTelemetry] Add traceparent and tracestate to extended element metadata proto From 4de302516717280296e00e04b6df89748ea432ba Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 2 Dec 2025 13:44:10 +0100 Subject: [PATCH 2/2] Model changes to allow OpenTelemetry context propagation dsadasdsa dsd --- .../beam/gradle/BeamModulePlugin.groovy | 12 +- .../google-cloud-dataflow-java/build.gradle | 1 + .../runners/dataflow/BatchViewOverrides.java | 6 + .../worker/build.gradle | 1 + .../worker/UngroupedWindmillReader.java | 14 +- .../worker/WindmillKeyedWorkItem.java | 10 +- .../worker/util/ValueInEmptyWindows.java | 6 + runners/spark/spark_runner.gradle | 1 + .../beam/runners/spark/util/TimerUtils.java | 6 + .../license_scripts/dep_urls_java.yaml | 4 +- sdks/java/core/build.gradle | 1 + .../beam/sdk/transforms/DoFnTester.java | 6 +- .../OpenTelemetryContextPropagator.java | 71 +++++++ .../apache/beam/sdk/values/OutputBuilder.java | 3 + .../beam/sdk/values/ValueInSingleWindow.java | 40 +++- .../apache/beam/sdk/values/WindowedValue.java | 4 + .../beam/sdk/values/WindowedValues.java | 201 +++++++++++++----- .../beam/sdk/util/WindowedValueTest.java | 21 +- .../extensions/gcp/GcpCoreApiSurfaceTest.java | 1 + 19 files changed, 340 insertions(+), 69 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/values/OpenTelemetryContextPropagator.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index daa8ed85e524..10e26672bf6c 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -635,7 +635,8 @@ class BeamModulePlugin implements Plugin { // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom def netty_version = "4.1.124.Final" // [bomupgrader] determined by: io.opentelemetry:opentelemetry-sdk, consistent with: google_cloud_platform_libraries_bom - def opentelemetry_version = "1.52.0" + def opentelemetry_sdk_version = "1.56.0" + def opentelemetry_contrib_version = "1.52.0" def postgres_version = "42.2.16" // [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom def protobuf_version = "4.33.0" @@ -857,8 +858,13 @@ class BeamModulePlugin implements Plugin { netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final", netty_transport : "io.netty:netty-transport:$netty_version", netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version", - opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version - opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // alpha required by extensions + opentelemetry_api : "io.opentelemetry:opentelemetry-api:$opentelemetry_sdk_version", + opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_sdk_version-alpha", // alpha required by extensions + opentelemetry_context : "io.opentelemetry:opentelemetry-context:$opentelemetry_sdk_version", + opentelemetry_gcp_auth : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_contrib_version-alpha", + opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk:$opentelemetry_sdk_version", + opentelemetry_exporter_otlp : "io.opentelemetry:opentelemetry-exporter-otlp:$opentelemetry_sdk_version", + opentelemetry_extension_autoconfigure : "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:$opentelemetry_sdk_version", postgres : "org.postgresql:postgresql:$postgres_version", protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version", protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version", diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 50498d24c624..56496bdd95a1 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -112,6 +112,7 @@ dependencies { implementation library.java.google_http_client implementation library.java.google_http_client_gson permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761 + implementation library.java.opentelemetry_context implementation library.java.hamcrest implementation library.java.jackson_annotations implementation library.java.jackson_core diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index e7bb4dc9c0ac..2dc08d94aa5f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.opentelemetry.context.Context; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -1403,6 +1404,11 @@ public PaneInfo getPaneInfo() { return null; } + @Override + public @Nullable Context getOpenTelemetryContext() { + return null; + } + @Override public @Nullable Long getRecordOffset() { return null; diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index 4068c5f88e4f..1fd6b428d7fd 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -207,6 +207,7 @@ dependencies { implementation library.java.jackson_core implementation library.java.jackson_databind implementation library.java.joda_time + implementation library.java.opentelemetry_context implementation library.java.slf4j_api implementation library.java.vendored_grpc_1_69_0 implementation library.java.error_prone_annotations diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index c248259a12de..4f136385ba6f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -137,10 +137,19 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce @SuppressWarnings("unchecked") T result = (T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data)); + // todo #37030 parse context from previous stage return WindowedValues.of( - result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream); + result, + timestampMillis, + windows, + paneInfo, + null, + null, + drainingValueFromUpstream, + null); } else { notifyElementRead(data.available() + metadata.available()); + // todo #37030 parse context from previous stage return WindowedValues.of( decode(valueCoder, data), timestampMillis, @@ -148,7 +157,8 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce paneInfo, null, null, - drainingValueFromUpstream); + drainingValueFromUpstream, + null); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index 415dab526bb5..cc6b96887732 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -129,8 +129,16 @@ public Iterable> elementsIterable() { } InputStream inputStream = message.getData().newInput(); ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER); + // todo #37030 parse context from previous stage return WindowedValues.of( - value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream); + value, + timestamp, + windows, + paneInfo, + null, + null, + drainingValueFromUpstream, + null); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 00bb282c6845..3d73b80ecd55 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.util; +import io.opentelemetry.context.Context; import java.util.Collection; import java.util.Collections; import java.util.Objects; @@ -64,6 +65,11 @@ public boolean causedByDrain() { return false; } + @Override + public @Nullable Context getOpenTelemetryContext() { + return null; + } + @Override public Iterable> explodeWindows() { return Collections.emptyList(); diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index ecdfc8f0f697..74665fbdc506 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -167,6 +167,7 @@ dependencies { implementation library.java.jackson_annotations implementation library.java.slf4j_api implementation library.java.joda_time + implementation library.java.opentelemetry_context implementation library.java.commons_lang3 implementation library.java.args4j implementation project(path: ":model:fn-execution", configuration: "shadow") diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java index 0be36d67388c..f9f2861d950e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.spark.util; +import io.opentelemetry.context.Context; import java.io.Serializable; import java.util.Collection; import java.util.Collections; @@ -115,6 +116,11 @@ public PaneInfo getPaneInfo() { return null; } + @Override + public @Nullable Context getOpenTelemetryContext() { + return null; + } + @Override public boolean causedByDrain() { return false; diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml b/sdks/java/container/license_scripts/dep_urls_java.yaml index b1b0c32c1b67..27f67222708f 100644 --- a/sdks/java/container/license_scripts/dep_urls_java.yaml +++ b/sdks/java/container/license_scripts/dep_urls_java.yaml @@ -66,11 +66,11 @@ org.eclipse.jgit: license: "https://www.eclipse.org/org/documents/edl-v10.html" type: "Eclipse Distribution License - v1.0" opentelemetry-bom: - '1.52.0': + '1.56.0': license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.52.0/LICENSE" type: "Apache License 2.0" opentelemetry-bom-alpha: - '1.52.0-alpha': + '1.56.0-alpha': license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.52.0/LICENSE" type: "Apache License 2.0" zstd-jni: diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 74b6dfe4bba7..14be9cb375a7 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,6 +98,7 @@ dependencies { shadow library.java.jackson_databind shadow platform(library.java.opentelemetry_bom) shadow library.java.opentelemetry_api + shadow library.java.opentelemetry_context shadow library.java.slf4j_api shadow library.java.snappy_java shadow library.java.joda_time diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 3bdeb57ed888..634b8ed8c71b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -490,7 +490,7 @@ public void output(TupleTag tag, T output, Instant timestamp, BoundedWind getMutableOutput(tag) .add( ValueInSingleWindow.of( - output, timestamp, window, PaneInfo.NO_FIRING, null, null)); + output, timestamp, window, PaneInfo.NO_FIRING, null, null, null)); } }; } @@ -623,7 +623,7 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp getMutableOutput(tag) .add( ValueInSingleWindow.of( - output, timestamp, element.getWindow(), element.getPaneInfo(), null, null)); + output, timestamp, element.getWindow(), element.getPaneInfo(), null, null, null)); } @Override @@ -635,7 +635,7 @@ public void outputWindowedValue( PaneInfo paneInfo) { for (BoundedWindow w : windows) { getMutableOutput(tag) - .add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null)); + .add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null, null)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OpenTelemetryContextPropagator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OpenTelemetryContextPropagator.java new file mode 100644 index 000000000000..11374ec8c458 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OpenTelemetryContextPropagator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapSetter; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.nullness.qual.Nullable; + +class OpenTelemetryContextPropagator { + + private static final TextMapSetter SETTER = + (carrier, key, value) -> { + if (carrier == null) { + return; + } + if ("traceparent".equals(key)) { + carrier.setTraceparent(value); + } else if ("tracestate".equals(key)) { + carrier.setTracestate(value); + } + }; + + private static final TextMapGetter GETTER = + new TextMapGetter() { + @Override + public Iterable keys(BeamFnApi.Elements.ElementMetadata carrier) { + return Lists.newArrayList("traceparent", "tracestate"); + } + + @Override + public @Nullable String get( + BeamFnApi.Elements.@Nullable ElementMetadata carrier, String key) { + if (carrier == null) { + return null; + } + if ("traceparent".equals(key)) { + return carrier.getTraceparent(); + } else if ("tracestate".equals(key)) { + return carrier.getTracestate(); + } + return null; + } + }; + + static void write(Context from, BeamFnApi.Elements.ElementMetadata.Builder builder) { + W3CTraceContextPropagator.getInstance().inject(from, builder, SETTER); + } + + static Context read(BeamFnApi.Elements.ElementMetadata from) { + return W3CTraceContextPropagator.getInstance().extract(Context.root(), from, GETTER); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java index 03e3088e5256..960d5c2184c2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.values; +import io.opentelemetry.context.Context; import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -50,5 +51,7 @@ public interface OutputBuilder extends WindowedValue { OutputBuilder setCausedByDrain(boolean causedByDrain); + OutputBuilder setOpenTelemetryContext(@Nullable Context openTelemetryContext); + void output(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 21df11119831..e59c780f4cc7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.values; import com.google.auto.value.AutoValue; +import io.opentelemetry.context.Context; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -66,6 +67,8 @@ public T getValue() { public abstract @Nullable Long getCurrentRecordOffset(); + public abstract @Nullable Context getOpenTelemetryContext(); + // todo #33176 specify additional metadata in the future public static ValueInSingleWindow of( T value, @@ -73,14 +76,21 @@ public static ValueInSingleWindow of( BoundedWindow window, PaneInfo paneInfo, @Nullable String currentRecordId, - @Nullable Long currentRecordOffset) { + @Nullable Long currentRecordOffset, + @Nullable Context openTelemetryContext) { return new AutoValue_ValueInSingleWindow<>( - value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset); + value, + timestamp, + window, + paneInfo, + currentRecordId, + currentRecordOffset, + openTelemetryContext); } public static ValueInSingleWindow of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - return of(value, timestamp, window, paneInfo, null, null); + return of(value, timestamp, window, paneInfo, null, null, null); } /** A coder for {@link ValueInSingleWindow}. */ @@ -105,11 +115,12 @@ public static Coder of( @Override public void encode(ValueInSingleWindow windowedElem, OutputStream outStream) throws IOException { - encode(windowedElem, outStream, Context.NESTED); + encode(windowedElem, outStream, Coder.Context.NESTED); } @Override - public void encode(ValueInSingleWindow windowedElem, OutputStream outStream, Context context) + public void encode( + ValueInSingleWindow windowedElem, OutputStream outStream, Coder.Context context) throws IOException { InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); windowCoder.encode(windowedElem.getWindow(), outStream); @@ -120,6 +131,12 @@ public void encode(ValueInSingleWindow windowedElem, OutputStream outStream, BeamFnApi.Elements.ElementMetadata.Builder builder = BeamFnApi.Elements.ElementMetadata.newBuilder(); // todo #33176 specify additional metadata in the future + io.opentelemetry.context.Context openTelemetryContext = + windowedElem.getOpenTelemetryContext(); + if (openTelemetryContext != null) { + + OpenTelemetryContextPropagator.write(openTelemetryContext, builder); + } BeamFnApi.Elements.ElementMetadata metadata = builder.build(); ByteArrayCoder.of().encode(metadata.toByteArray(), outStream); } @@ -129,22 +146,27 @@ public void encode(ValueInSingleWindow windowedElem, OutputStream outStream, @Override public ValueInSingleWindow decode(InputStream inStream) throws IOException { - return decode(inStream, Context.NESTED); + return decode(inStream, Coder.Context.NESTED); } @Override @SuppressWarnings("IgnoredPureGetter") - public ValueInSingleWindow decode(InputStream inStream, Context context) throws IOException { + public ValueInSingleWindow decode(InputStream inStream, Coder.Context context) + throws IOException { Instant timestamp = InstantCoder.of().decode(inStream); BoundedWindow window = windowCoder.decode(inStream); PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream); + io.opentelemetry.context.Context openTelemetryContext = null; if (WindowedValues.WindowedValueCoder.isMetadataSupported() && paneInfo.isElementMetadata()) { - BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream)); + BeamFnApi.Elements.ElementMetadata elementMetadata = + BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream)); + openTelemetryContext = OpenTelemetryContextPropagator.read(elementMetadata); } T value = valueCoder.decode(inStream, context); // todo #33176 specify additional metadata in the future - return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo, null, null); + return new AutoValue_ValueInSingleWindow<>( + value, timestamp, window, paneInfo, null, null, openTelemetryContext); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java index bcd58b903171..6f00347fcb85 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.values; +import io.opentelemetry.context.Context; import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -49,6 +50,9 @@ public interface WindowedValue { @Nullable String getRecordId(); + @Nullable + Context getOpenTelemetryContext(); + @Nullable Long getRecordOffset(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index b194207000ed..5f017a1dc3b0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import io.opentelemetry.context.Context; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -77,7 +78,8 @@ public static Builder builder(WindowedValue template) { .setValue(template.getValue()) .setTimestamp(template.getTimestamp()) .setWindows(template.getWindows()) - .setPaneInfo(template.getPaneInfo()); + .setPaneInfo(template.getPaneInfo()) + .setOpenTelemetryContext(template.getOpenTelemetryContext()); } public static class Builder implements OutputBuilder { @@ -99,6 +101,7 @@ public static class Builder implements OutputBuilder { private @MonotonicNonNull Collection windows; private @Nullable String recordId; private @Nullable Long recordOffset; + private @Nullable Context openTelemetryContext; private boolean causedByDrain; @Override @@ -149,6 +152,12 @@ public Builder setCausedByDrain(boolean causedByDrain) { return this; } + @Override + public Builder setOpenTelemetryContext(@Nullable Context openTelemetryContext) { + this.openTelemetryContext = openTelemetryContext; + return this; + } + public Builder setReceiver(WindowedValueReceiver receiver) { this.receiver = receiver; return this; @@ -175,6 +184,11 @@ public Instant getTimestamp() { return timestamp; } + @Override + public @Nullable Context getOpenTelemetryContext() { + return openTelemetryContext; + } + @Override public Collection getWindows() { checkStateNotNull(windows, "Windows not set"); @@ -237,7 +251,8 @@ public WindowedValue build() { getPaneInfo(), getRecordId(), getRecordOffset(), - causedByDrain()); + causedByDrain(), + getOpenTelemetryContext()); } @Override @@ -255,7 +270,7 @@ public String toString() { public static WindowedValue of( T value, Instant timestamp, Collection windows, PaneInfo paneInfo) { - return of(value, timestamp, windows, paneInfo, null, null, false); + return of(value, timestamp, windows, paneInfo, null, null, false, null); } /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ @@ -266,15 +281,31 @@ public static WindowedValue of( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + boolean causedByDrain, + @Nullable Context openTelemetryContext) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); + return of( + value, + timestamp, + windows.iterator().next(), + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + openTelemetryContext); } else { return new TimestampedValueInMultipleWindows<>( - value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + value, + timestamp, + windows, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + openTelemetryContext); } } @@ -285,12 +316,21 @@ static WindowedValue createWithoutValidation( Instant timestamp, Collection windows, PaneInfo paneInfo, - boolean causedByDrain) { + boolean causedByDrain, + @Nullable Context openTelemetryContext) { if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); + return of( + value, + timestamp, + windows.iterator().next(), + paneInfo, + null, + null, + causedByDrain, + openTelemetryContext); } else { return new TimestampedValueInMultipleWindows<>( - value, timestamp, windows, paneInfo, null, null, causedByDrain); + value, timestamp, windows, paneInfo, null, null, causedByDrain, openTelemetryContext); } } @@ -299,12 +339,19 @@ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); - return of(value, timestamp, window, paneInfo, false); + return of(value, timestamp, window, paneInfo, null, null, false, null); } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( - T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo, boolean causedByDrain) { + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset, + boolean causedByDrain, + @Nullable Context openTelemetryContext) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); @@ -312,10 +359,23 @@ public static WindowedValue of( return valueInGlobalWindow(value, paneInfo); } else if (isGlobal) { return new TimestampedValueInGlobalWindow<>( - value, timestamp, paneInfo, null, null, causedByDrain); + value, + timestamp, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + openTelemetryContext); } else { return new TimestampedValueInSingleWindow<>( - value, timestamp, window, paneInfo, null, null, causedByDrain); + value, + timestamp, + window, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + openTelemetryContext); } } @@ -324,7 +384,7 @@ public static WindowedValue of( * default timestamp and pane. */ public static WindowedValue valueInGlobalWindow(T value) { - return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null, false); + return new ValueInGlobalWindow<>(value, PaneInfo.NO_FIRING, null, null, false, null); } /** @@ -332,7 +392,7 @@ public static WindowedValue valueInGlobalWindow(T value) { * default timestamp and the specified pane. */ public static WindowedValue valueInGlobalWindow(T value, PaneInfo paneInfo) { - return new ValueInGlobalWindow<>(value, paneInfo, null, null, false); + return new ValueInGlobalWindow<>(value, paneInfo, null, null, false, null); } /** @@ -344,7 +404,7 @@ public static WindowedValue timestampedValueInGlobalWindow(T value, Insta return valueInGlobalWindow(value); } else { return new TimestampedValueInGlobalWindow<>( - value, timestamp, PaneInfo.NO_FIRING, null, null, false); + value, timestamp, PaneInfo.NO_FIRING, null, null, false, null); } } @@ -357,7 +417,8 @@ public static WindowedValue timestampedValueInGlobalWindow( if (paneInfo.equals(PaneInfo.NO_FIRING)) { return timestampedValueInGlobalWindow(value, timestamp); } else { - return new TimestampedValueInGlobalWindow<>(value, timestamp, paneInfo, null, null, false); + return new TimestampedValueInGlobalWindow<>( + value, timestamp, paneInfo, null, null, false, null); } } @@ -374,7 +435,8 @@ public static WindowedValue withValue( windowedValue.getPaneInfo(), windowedValue.getRecordId(), windowedValue.getRecordOffset(), - windowedValue.causedByDrain()); + windowedValue.causedByDrain(), + windowedValue.getOpenTelemetryContext()); } public static boolean equals( @@ -426,6 +488,7 @@ private abstract static class SimpleWindowedValue implements WindowedValue private final @Nullable String currentRecordId; private final @Nullable Long currentRecordOffset; private final boolean causedByDrain; + private final @Nullable Context context; @Override public @Nullable String getRecordId() { @@ -447,12 +510,14 @@ protected SimpleWindowedValue( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { + boolean causedByDrain, + @Nullable Context context) { this.value = value; this.paneInfo = checkNotNull(paneInfo); this.currentRecordId = currentRecordId; this.currentRecordOffset = currentRecordOffset; this.causedByDrain = causedByDrain; + this.context = context; } @Override @@ -465,6 +530,11 @@ public T getValue() { return value; } + @Override + public @Nullable Context getOpenTelemetryContext() { + return context; + } + @Override public Iterable> explodeWindows() { if (this.getWindows().size() == 1) { @@ -501,8 +571,9 @@ public MinTimestampWindowedValue( PaneInfo pane, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { - super(value, pane, currentRecordId, currentRecordOffset, causedByDrain); + boolean causedByDrain, + @Nullable Context context) { + super(value, pane, currentRecordId, currentRecordOffset, causedByDrain, context); } @Override @@ -520,8 +591,9 @@ public ValueInGlobalWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { - super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + boolean causedByDrain, + @Nullable Context context) { + super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, context); } @Override @@ -537,7 +609,12 @@ public BoundedWindow getWindow() { @Override public WindowedValue withValue(NewT newValue) { return new ValueInGlobalWindow<>( - newValue, getPaneInfo(), getRecordId(), getRecordOffset(), causedByDrain()); + newValue, + getPaneInfo(), + getRecordId(), + getRecordOffset(), + causedByDrain(), + getOpenTelemetryContext()); } @Override @@ -576,8 +653,9 @@ public TimestampedWindowedValue( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { - super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + boolean causedByDrain, + @Nullable Context context) { + super(value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, context); this.timestamp = checkNotNull(timestamp); } @@ -600,8 +678,10 @@ public TimestampedValueInGlobalWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { - super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + boolean causedByDrain, + @Nullable Context context) { + super( + value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain, context); } @Override @@ -622,7 +702,8 @@ public WindowedValue withValue(NewT newValue) { getPaneInfo(), getRecordId(), getRecordOffset(), - causedByDrain()); + causedByDrain(), + getOpenTelemetryContext()); } @Override @@ -673,8 +754,16 @@ public TimestampedValueInSingleWindow( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { - super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + boolean causedByDrain, + @Nullable Context openTelemetryContext) { + super( + value, + timestamp, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + openTelemetryContext); this.window = checkNotNull(window); } @@ -687,7 +776,8 @@ public WindowedValue withValue(NewT newValue) { getPaneInfo(), getRecordId(), getRecordOffset(), - causedByDrain()); + causedByDrain(), + getOpenTelemetryContext()); } @Override @@ -745,8 +835,16 @@ public TimestampedValueInMultipleWindows( PaneInfo paneInfo, @Nullable String currentRecordId, @Nullable Long currentRecordOffset, - boolean causedByDrain) { - super(value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); + boolean causedByDrain, + @Nullable Context openTelemetryContext) { + super( + value, + timestamp, + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain, + openTelemetryContext); this.windows = checkNotNull(windows); } @@ -764,7 +862,8 @@ public WindowedValue withValue(NewT newValue) { getPaneInfo(), getRecordId(), getRecordOffset(), - causedByDrain()); + causedByDrain(), + getOpenTelemetryContext()); } @Override @@ -904,11 +1003,11 @@ public WindowedValueCoder withValueCoder(Coder valueCoder) { @Override public void encode(WindowedValue windowedElem, OutputStream outStream) throws CoderException, IOException { - encode(windowedElem, outStream, Context.NESTED); + encode(windowedElem, outStream, Coder.Context.NESTED); } @Override - public void encode(WindowedValue windowedElem, OutputStream outStream, Context context) + public void encode(WindowedValue windowedElem, OutputStream outStream, Coder.Context context) throws CoderException, IOException { InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); windowsCoder.encode(windowedElem.getWindows(), outStream); @@ -918,6 +1017,10 @@ public void encode(WindowedValue windowedElem, OutputStream outStream, Contex if (metadataSupported) { BeamFnApi.Elements.ElementMetadata.Builder builder = BeamFnApi.Elements.ElementMetadata.newBuilder(); + io.opentelemetry.context.Context context1 = windowedElem.getOpenTelemetryContext(); + if (context1 != null) { + OpenTelemetryContextPropagator.write(context1, builder); + } BeamFnApi.Elements.ElementMetadata em = builder .setDrain( @@ -933,16 +1036,17 @@ public void encode(WindowedValue windowedElem, OutputStream outStream, Contex @Override public WindowedValue decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Context.NESTED); + return decode(inStream, Coder.Context.NESTED); } @Override - public WindowedValue decode(InputStream inStream, Context context) + public WindowedValue decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { Instant timestamp = InstantCoder.of().decode(inStream); Collection windows = windowsCoder.decode(inStream); PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream); boolean causedByDrain = false; + io.opentelemetry.context.Context openTelemetryContext = null; if (isMetadataSupported() && paneInfo.isElementMetadata()) { BeamFnApi.Elements.ElementMetadata elementMetadata = BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream)); @@ -951,13 +1055,14 @@ public WindowedValue decode(InputStream inStream, Context context) b ? elementMetadata.getDrain().equals(BeamFnApi.Elements.DrainMode.Enum.DRAINING) : false; + openTelemetryContext = OpenTelemetryContextPropagator.read(elementMetadata); } T value = valueCoder.decode(inStream, context); // Because there are some remaining (incorrect) uses of WindowedValue with no windows, // we call this deprecated no-validation path when decoding return WindowedValues.createWithoutValidation( - value, timestamp, windows, paneInfo, causedByDrain); + value, timestamp, windows, paneInfo, causedByDrain, openTelemetryContext); } @Override @@ -1023,22 +1128,22 @@ public WindowedValueCoder withValueCoder(Coder valueCoder) { @Override public void encode(WindowedValue windowedElem, OutputStream outStream) throws CoderException, IOException { - encode(windowedElem, outStream, Context.NESTED); + encode(windowedElem, outStream, Coder.Context.NESTED); } @Override - public void encode(WindowedValue windowedElem, OutputStream outStream, Context context) + public void encode(WindowedValue windowedElem, OutputStream outStream, Coder.Context context) throws CoderException, IOException { valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override public WindowedValue decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Context.NESTED); + return decode(inStream, Coder.Context.NESTED); } @Override - public WindowedValue decode(InputStream inStream, Context context) + public WindowedValue decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { T value = valueCoder.decode(inStream, context); return WindowedValues.valueInGlobalWindow(value); @@ -1134,22 +1239,22 @@ public WindowedValueCoder withValueCoder(Coder valueCoder) { @Override public void encode(WindowedValue windowedElem, OutputStream outStream) throws CoderException, IOException { - encode(windowedElem, outStream, Context.NESTED); + encode(windowedElem, outStream, Coder.Context.NESTED); } @Override - public void encode(WindowedValue windowedElem, OutputStream outStream, Context context) + public void encode(WindowedValue windowedElem, OutputStream outStream, Coder.Context context) throws CoderException, IOException { valueCoder.encode(windowedElem.getValue(), outStream, context); } @Override public WindowedValue decode(InputStream inStream) throws CoderException, IOException { - return decode(inStream, Context.NESTED); + return decode(inStream, Coder.Context.NESTED); } @Override - public WindowedValue decode(InputStream inStream, Context context) + public WindowedValue decode(InputStream inStream, Coder.Context context) throws CoderException, IOException { return WindowedValues.withValue(windowedValuePrototype, valueCoder.decode(inStream, context)); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 915399311859..2e5cde0d8132 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -23,6 +23,11 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; import java.util.Arrays; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -80,6 +85,17 @@ public void testWindowedValueCoder() throws CoderException { @Test public void testWindowedValueWithElementMetadataCoder() throws CoderException { WindowedValues.WindowedValueCoder.setMetadataSupported(); + + Context context = + Context.current() + .with( + Span.wrap( + SpanContext.create( + "ff000000000000000000000000000041", + "ff00000000000041", + TraceFlags.getSampled(), + TraceState.builder().put("foo", "bar").put("bar", "baz").build()))); + Instant timestamp = new Instant(1234); WindowedValue value = WindowedValues.of( @@ -92,12 +108,14 @@ public void testWindowedValueWithElementMetadataCoder() throws CoderException { PaneInfo.NO_FIRING, null, null, - true); // drain is persisted as part of metadata + true, + context); Coder> windowedValueCoder = WindowedValues.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder()); byte[] encodedValue = CoderUtils.encodeToByteArray(windowedValueCoder, value); + WindowedValue decodedValue = CoderUtils.decodeFromByteArray(windowedValueCoder, encodedValue); @@ -105,6 +123,7 @@ public void testWindowedValueWithElementMetadataCoder() throws CoderException { Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp()); Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); Assert.assertTrue(value.causedByDrain()); + Assert.assertNotNull(value.getOpenTelemetryContext()); } @Test diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index b66e1f16fc93..d8fb55cfd1dd 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -58,6 +58,7 @@ public void testGcpCoreApiSurface() throws Exception { classesInPackage("com.fasterxml.jackson.annotation"), classesInPackage("com.google.cloud.hadoop.gcsio"), classesInPackage("com.google.common.collect"), // Via gcs-connector ReadOptions builder + classesInPackage("io.opentelemetry"), // open telemetry classesInPackage("java"), classesInPackage("javax"), classesInPackage("org.apache.beam.model.pipeline.v1"),