Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,6 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get();
if (baggage == null) {
return Context.current();
}
return Context.current().with(baggage);
}

private static final class ClientTracer extends ClientStreamTracer {
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
Expand Down Expand Up @@ -286,7 +278,6 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand Down Expand Up @@ -316,15 +307,15 @@ void recordFinishedAttempt() {

if (module.resource.clientAttemptDurationCounter() != null ) {
module.resource.clientAttemptDurationCounter()
.record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
.record(attemptNanos * SECONDS_PER_NANO, attribute, attemptsState.otelContext);
}
if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
module.resource.clientTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attribute, otelContext);
.record(outboundWireSize, attribute, attemptsState.otelContext);
}
if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attribute, otelContext);
.record(inboundWireSize, attribute, attemptsState.otelContext);
}
}
}
Expand All @@ -339,6 +330,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
private boolean callEnded;
private final String fullMethodName;
private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
private final Context otelContext;
private Status status;
private long retryDelayNanos;
private long callLatencyNanos;
Expand All @@ -356,11 +348,12 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory
String target,
CallOptions callOptions,
String fullMethodName,
List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins, Context otelContext) {
this.module = checkNotNull(module, "module");
this.target = checkNotNull(target, "target");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.callPlugins = checkNotNull(callPlugins, "callPlugins");
this.otelContext = checkNotNull(otelContext, "otelContext");
this.attemptDelayStopwatch = module.stopwatchSupplier.get();
this.callStopWatch = module.stopwatchSupplier.get().start();

Expand All @@ -375,7 +368,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory

// Record here in case mewClientStreamTracer() would never be called.
if (module.resource.clientAttemptCountCounter() != null) {
module.resource.clientAttemptCountCounter().add(1, attribute);
module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
}
}

Expand Down Expand Up @@ -404,7 +397,7 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
}
io.opentelemetry.api.common.Attributes attribute = builder.build();
if (module.resource.clientAttemptCountCounter() != null) {
module.resource.clientAttemptCountCounter().add(1, attribute);
module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
}
}
if (info.isTransparentRetry()) {
Expand Down Expand Up @@ -467,7 +460,6 @@ void callEnded(Status status, CallOptions callOptions) {
}

void recordFinishedCall(CallOptions callOptions) {
Context otelContext = otelContextWithBaggage();
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -569,6 +561,7 @@ private static final class ServerTracer extends ServerStreamTracer {
private final OpenTelemetryMetricsModule module;
private final String fullMethodName;
private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
private Context otelContext = Context.root();
private volatile boolean isGeneratedMethod;
private volatile int streamClosed;
private final Stopwatch stopwatch;
Expand All @@ -583,19 +576,31 @@ private static final class ServerTracer extends ServerStreamTracer {
this.stopwatch = module.stopwatchSupplier.get().start();
}

@Override
public io.grpc.Context filterContext(io.grpc.Context context) {
Baggage baggage = BAGGAGE_KEY.get(context);
if (baggage != null) {
otelContext = Context.current().with(baggage);
} else {
otelContext = Context.current();
}
return context;
}

@Override
public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
// Only record method name as an attribute if isSampledToLocalTracing is set to true,
// which is true for all generated methods. Otherwise, programmatically
// created methods result in high cardinality metrics.
boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
isGeneratedMethod = isSampledToLocalTracing;

io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

if (module.resource.serverCallCountCounter() != null) {
module.resource.serverCallCountCounter().add(1, attribute);
module.resource.serverCallCountCounter().add(1, attribute, otelContext);
}
}

Expand Down Expand Up @@ -627,7 +632,6 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand Down Expand Up @@ -678,7 +682,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
}
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
}
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName,
streamPlugins);
}
}

Expand Down Expand Up @@ -716,7 +721,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
OpenTelemetryMetricsModule.this, target, callOptions,
recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
callPlugins);
callPlugins, Context.current());
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
Expand All @@ -739,3 +744,4 @@ public void onClose(Status status, Metadata trailers) {
}
}
}

Loading
Loading