Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ class BeamModulePlugin implements Plugin<Project> {
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
def netty_version = "4.1.124.Final"
// [bomupgrader] determined by: io.opentelemetry:opentelemetry-sdk, consistent with: google_cloud_platform_libraries_bom
def opentelemetry_version = "1.52.0"
def opentelemetry_sdk_version = "1.56.0"
def opentelemetry_contrib_version = "1.52.0"
def postgres_version = "42.2.16"
// [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom
def protobuf_version = "4.33.0"
Expand Down Expand Up @@ -857,8 +858,13 @@ class BeamModulePlugin implements Plugin<Project> {
netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final",
netty_transport : "io.netty:netty-transport:$netty_version",
netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version",
opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // alpha required by extensions
opentelemetry_api : "io.opentelemetry:opentelemetry-api:$opentelemetry_sdk_version",
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_sdk_version-alpha", // alpha required by extensions
opentelemetry_context : "io.opentelemetry:opentelemetry-context:$opentelemetry_sdk_version",
opentelemetry_gcp_auth : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_contrib_version-alpha",
opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk:$opentelemetry_sdk_version",
opentelemetry_exporter_otlp : "io.opentelemetry:opentelemetry-exporter-otlp:$opentelemetry_sdk_version",
opentelemetry_extension_autoconfigure : "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:$opentelemetry_sdk_version",
postgres : "org.postgresql:postgresql:$postgres_version",
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ dependencies {
implementation library.java.google_http_client
implementation library.java.google_http_client_gson
permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
implementation library.java.opentelemetry_context
implementation library.java.hamcrest
implementation library.java.jackson_annotations
implementation library.java.jackson_core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentelemetry.context.Context;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -1403,6 +1404,11 @@ public PaneInfo getPaneInfo() {
return null;
}

@Override
public @Nullable Context getOpenTelemetryContext() {
return null;
}

@Override
public @Nullable Long getRecordOffset() {
return null;
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ dependencies {
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.joda_time
implementation library.java.opentelemetry_context
implementation library.java.slf4j_api
implementation library.java.vendored_grpc_1_69_0
implementation library.java.error_prone_annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,28 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
@SuppressWarnings("unchecked")
T result =
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
// todo #37030 parse context from previous stage
return WindowedValues.of(
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
result,
timestampMillis,
windows,
paneInfo,
null,
null,
drainingValueFromUpstream,
null);
} else {
notifyElementRead(data.available() + metadata.available());
// todo #37030 parse context from previous stage
return WindowedValues.of(
decode(valueCoder, data),
timestampMillis,
windows,
paneInfo,
null,
null,
drainingValueFromUpstream);
drainingValueFromUpstream,
null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,16 @@ public Iterable<WindowedValue<ElemT>> elementsIterable() {
}
InputStream inputStream = message.getData().newInput();
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
// todo #37030 parse context from previous stage
return WindowedValues.of(
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream);
value,
timestamp,
windows,
paneInfo,
null,
null,
drainingValueFromUpstream,
null);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.util;

import io.opentelemetry.context.Context;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
Expand Down Expand Up @@ -64,6 +65,11 @@ public boolean causedByDrain() {
return false;
}

@Override
public @Nullable Context getOpenTelemetryContext() {
return null;
}

@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
Expand Down
1 change: 1 addition & 0 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ dependencies {
implementation library.java.jackson_annotations
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.opentelemetry_context
implementation library.java.commons_lang3
implementation library.java.args4j
implementation project(path: ":model:fn-execution", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.spark.util;

import io.opentelemetry.context.Context;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -115,6 +116,11 @@ public PaneInfo getPaneInfo() {
return null;
}

@Override
public @Nullable Context getOpenTelemetryContext() {
return null;
}

@Override
public boolean causedByDrain() {
return false;
Expand Down
4 changes: 2 additions & 2 deletions sdks/java/container/license_scripts/dep_urls_java.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ org.eclipse.jgit:
license: "https://www.eclipse.org/org/documents/edl-v10.html"
type: "Eclipse Distribution License - v1.0"
opentelemetry-bom:
'1.52.0':
'1.56.0':
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.52.0/LICENSE"
type: "Apache License 2.0"
opentelemetry-bom-alpha:
'1.52.0-alpha':
'1.56.0-alpha':
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.52.0/LICENSE"
type: "Apache License 2.0"
zstd-jni:
Expand Down
1 change: 1 addition & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ dependencies {
shadow library.java.jackson_databind
shadow platform(library.java.opentelemetry_bom)
shadow library.java.opentelemetry_api
shadow library.java.opentelemetry_context
shadow library.java.slf4j_api
shadow library.java.snappy_java
shadow library.java.joda_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWind
getMutableOutput(tag)
.add(
ValueInSingleWindow.of(
output, timestamp, window, PaneInfo.NO_FIRING, null, null));
output, timestamp, window, PaneInfo.NO_FIRING, null, null, null));
}
};
}
Expand Down Expand Up @@ -623,7 +623,7 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
getMutableOutput(tag)
.add(
ValueInSingleWindow.of(
output, timestamp, element.getWindow(), element.getPaneInfo(), null, null));
output, timestamp, element.getWindow(), element.getPaneInfo(), null, null, null));
}

@Override
Expand All @@ -635,7 +635,7 @@ public <T> void outputWindowedValue(
PaneInfo paneInfo) {
for (BoundedWindow w : windows) {
getMutableOutput(tag)
.add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null));
.add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null, null));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.values;

import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapSetter;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;

class OpenTelemetryContextPropagator {

private static final TextMapSetter<BeamFnApi.Elements.ElementMetadata.Builder> SETTER =
(carrier, key, value) -> {
if (carrier == null) {
return;
}
if ("traceparent".equals(key)) {
carrier.setTraceparent(value);
} else if ("tracestate".equals(key)) {
carrier.setTracestate(value);
}
};

private static final TextMapGetter<BeamFnApi.Elements.ElementMetadata> GETTER =
new TextMapGetter<BeamFnApi.Elements.ElementMetadata>() {
@Override
public Iterable<String> keys(BeamFnApi.Elements.ElementMetadata carrier) {
return Lists.newArrayList("traceparent", "tracestate");
}

@Override
public @Nullable String get(
BeamFnApi.Elements.@Nullable ElementMetadata carrier, String key) {
if (carrier == null) {
return null;
}
if ("traceparent".equals(key)) {
return carrier.getTraceparent();
} else if ("tracestate".equals(key)) {
return carrier.getTracestate();
}
return null;
}
};

static void write(Context from, BeamFnApi.Elements.ElementMetadata.Builder builder) {
W3CTraceContextPropagator.getInstance().inject(from, builder, SETTER);
}

static Context read(BeamFnApi.Elements.ElementMetadata from) {
return W3CTraceContextPropagator.getInstance().extract(Context.root(), from, GETTER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.values;

import io.opentelemetry.context.Context;
import java.util.Collection;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand Down Expand Up @@ -50,5 +51,7 @@ public interface OutputBuilder<T> extends WindowedValue<T> {

OutputBuilder<T> setCausedByDrain(boolean causedByDrain);

OutputBuilder<T> setOpenTelemetryContext(@Nullable Context openTelemetryContext);

void output();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.values;

import com.google.auto.value.AutoValue;
import io.opentelemetry.context.Context;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -66,21 +67,30 @@ public T getValue() {

public abstract @Nullable Long getCurrentRecordOffset();

public abstract @Nullable Context getOpenTelemetryContext();

// todo #33176 specify additional metadata in the future
public static <T> ValueInSingleWindow<T> of(
T value,
Instant timestamp,
BoundedWindow window,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset) {
@Nullable Long currentRecordOffset,
@Nullable Context openTelemetryContext) {
return new AutoValue_ValueInSingleWindow<>(
value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset);
value,
timestamp,
window,
paneInfo,
currentRecordId,
currentRecordOffset,
openTelemetryContext);
}

public static <T> ValueInSingleWindow<T> of(
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
return of(value, timestamp, window, paneInfo, null, null);
return of(value, timestamp, window, paneInfo, null, null, null);
}

/** A coder for {@link ValueInSingleWindow}. */
Expand All @@ -105,11 +115,12 @@ public static <T> Coder<T> of(
@Override
public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream)
throws IOException {
encode(windowedElem, outStream, Context.NESTED);
encode(windowedElem, outStream, Coder.Context.NESTED);
}

@Override
public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context)
public void encode(
ValueInSingleWindow<T> windowedElem, OutputStream outStream, Coder.Context context)
throws IOException {
InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
windowCoder.encode(windowedElem.getWindow(), outStream);
Expand All @@ -120,6 +131,12 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream,
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
// todo #33176 specify additional metadata in the future
io.opentelemetry.context.Context openTelemetryContext =
windowedElem.getOpenTelemetryContext();
if (openTelemetryContext != null) {

OpenTelemetryContextPropagator.write(openTelemetryContext, builder);
}
BeamFnApi.Elements.ElementMetadata metadata = builder.build();
ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
}
Expand All @@ -129,22 +146,27 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream,

@Override
public ValueInSingleWindow<T> decode(InputStream inStream) throws IOException {
return decode(inStream, Context.NESTED);
return decode(inStream, Coder.Context.NESTED);
}

@Override
@SuppressWarnings("IgnoredPureGetter")
public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException {
public ValueInSingleWindow<T> decode(InputStream inStream, Coder.Context context)
throws IOException {
Instant timestamp = InstantCoder.of().decode(inStream);
BoundedWindow window = windowCoder.decode(inStream);
PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
io.opentelemetry.context.Context openTelemetryContext = null;
if (WindowedValues.WindowedValueCoder.isMetadataSupported() && paneInfo.isElementMetadata()) {
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
BeamFnApi.Elements.ElementMetadata elementMetadata =
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
openTelemetryContext = OpenTelemetryContextPropagator.read(elementMetadata);
}

T value = valueCoder.decode(inStream, context);
// todo #33176 specify additional metadata in the future
return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo, null, null);
return new AutoValue_ValueInSingleWindow<>(
value, timestamp, window, paneInfo, null, null, openTelemetryContext);
}

@Override
Expand Down
Loading
Loading