From a30d55944ac4e18106be01d0260969bcaed196c5 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Fri, 30 Jan 2026 11:51:52 +0100 Subject: [PATCH 1/3] move reactive-streams to context tracking --- .../PublisherInstrumentation.java | 58 +++++-------------- .../ReactiveStreamsModule.java | 41 +++++++++++++ .../SubscriberInstrumentation.java | 51 ++++++---------- 3 files changed, 74 insertions(+), 76 deletions(-) create mode 100644 dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java index eb6eaa99caf..221a9c5c369 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java @@ -3,22 +3,17 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasInterface; import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isStatic; import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentScope; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import java.util.HashMap; -import java.util.Map; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -26,18 +21,13 @@ import org.reactivestreams.Subscriber; /** - * This instrumentation is responsible for capturing the span when {@link + * This instrumentation is responsible for capturing the context when {@link * Publisher#subscribe(Subscriber)} is called. The state is then stored and will be used to * eventually propagate on the downstream signals. */ -@AutoService(InstrumenterModule.class) -public class PublisherInstrumentation extends InstrumenterModule.Tracing +public class PublisherInstrumentation implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { - public PublisherInstrumentation() { - super("reactive-streams", "reactive-streams-1"); - } - @Override public String hierarchyMarkerType() { return "org.reactivestreams.Publisher"; @@ -48,24 +38,6 @@ public ElementMatcher hierarchyMatcher() { return implementsInterface(named("org.reactivestreams.Publisher")); } - @Override - public Map contextStore() { - final Map ret = new HashMap<>(); - ret.put("org.reactivestreams.Subscriber", AgentSpan.class.getName()); - ret.put("org.reactivestreams.Publisher", AgentSpan.class.getName()); - return ret; - } - - @Override - public String[] helperClassNames() { - return new String[] { - this.packageName + ".ReactiveStreamsAsyncResultExtension", - this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedPublisher", - this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscriber", - this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscription", - }; - } - @Override public void methodAdvice(MethodTransformer transformer) { transformer.applyAdvice( @@ -79,27 +51,27 @@ public void methodAdvice(MethodTransformer transformer) { public static class PublisherSubscribeAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope onSubscribe( + public static ContextScope onSubscribe( @Advice.This final Publisher self, @Advice.Argument(value = 0) final Subscriber s) { - final AgentSpan span = - InstrumentationContext.get(Publisher.class, AgentSpan.class).remove(self); - final AgentSpan activeSpan = activeSpan(); - if (s == null || (span == null && activeSpan == null)) { + final Context context = + InstrumentationContext.get(Publisher.class, Context.class).remove(self); + final Context activeContext = Java8BytecodeBridge.getCurrentContext(); + if (s == null || (context == null && activeContext == null)) { return null; } - final AgentSpan current = - InstrumentationContext.get(Subscriber.class, AgentSpan.class) - .putIfAbsent(s, span != null ? span : activeSpan); + final Context current = + InstrumentationContext.get(Subscriber.class, Context.class) + .putIfAbsent(s, context != null ? context : activeContext); if (current != null) { - return activateSpan(current); + return current.attach(); } return null; } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void afterSubscribe(@Advice.Enter final AgentScope scope) { + public static void afterSubscribe(@Advice.Enter final ContextScope scope) { if (scope != null) { scope.close(); } diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java new file mode 100644 index 00000000000..164eec431c1 --- /dev/null +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java @@ -0,0 +1,41 @@ +package datadog.trace.instrumentation.reactivestreams; + +import static java.util.Arrays.asList; + +import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@AutoService(InstrumenterModule.class) +public final class ReactiveStreamsModule extends InstrumenterModule.ContextTracking { + public ReactiveStreamsModule() { + super("reactive-streams", "reactive-streams-1"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".ReactiveStreamsAsyncResultExtension", + packageName + ".ReactiveStreamsAsyncResultExtension$WrappedPublisher", + packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscriber", + packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscription", + }; + } + + @Override + public Map contextStore() { + final Map store = new HashMap<>(); + store.put("org.reactivestreams.Subscriber", Context.class.getName()); + store.put("org.reactivestreams.Publisher", Context.class.getName()); + return store; + } + + @Override + public List typeInstrumentations() { + return asList(new PublisherInstrumentation(), new SubscriberInstrumentation()); + } +} diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java index cb27f314393..d184e80e728 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java @@ -2,20 +2,15 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; import static net.bytebuddy.matcher.ElementMatchers.isMethod; -import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers; import datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentScope; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import java.util.Collections; -import java.util.Map; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -25,12 +20,8 @@ * This instrumentation is responsible for propagating the state on the downstream signals (onNext, * onError, onComplete). */ -@AutoService(InstrumenterModule.class) -public class SubscriberInstrumentation extends InstrumenterModule.Tracing +public class SubscriberInstrumentation implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { - public SubscriberInstrumentation() { - super("reactive-streams", "reactive-streams-1"); - } @Override public void methodAdvice(MethodTransformer transformer) { @@ -41,11 +32,6 @@ public void methodAdvice(MethodTransformer transformer) { isMethod().and(named("onComplete")), getClass().getName() + "$SubscriberCompleteAdvice"); } - @Override - public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Subscriber", AgentSpan.class.getName()); - } - @Override public String hierarchyMarkerType() { return "org.reactivestreams.Subscriber"; @@ -60,21 +46,21 @@ public ElementMatcher hierarchyMatcher() { * This advice propagate the downstream signals onNext and onError. The context on reactor is * propagating bottom up, but we can have processing pipelines that wants to propagate the state * downstream (i.e. state coming from the source). For this reason we allow to let the active - * scope propagate downstream if any. If missing, we'll use the one captured on subscribe. + * context propagate downstream if any. If missing, we'll use the one captured on subscribe. */ public static class SubscriberDownStreamAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope before(@Advice.This final Subscriber self) { - if (activeSpan() != null) { + public static ContextScope before(@Advice.This final Subscriber self) { + final Context currentContext = Java8BytecodeBridge.getCurrentContext(); + if (currentContext != null && currentContext != Java8BytecodeBridge.getRootContext()) { return null; } - final AgentSpan span = - InstrumentationContext.get(Subscriber.class, AgentSpan.class).get(self); - return span == null ? null : activateSpan(span); + final Context context = InstrumentationContext.get(Subscriber.class, Context.class).get(self); + return context == null ? null : context.attach(); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void closeScope(@Advice.Enter final AgentScope scope) { + public static void closeScope(@Advice.Enter final ContextScope scope) { if (scope != null) { scope.close(); } @@ -82,20 +68,19 @@ public static void closeScope(@Advice.Enter final AgentScope scope) { } /** - * Propagates the span captured onSubscribe when the onComplete method is called. We do not let to - * propagate the active span if different from the span captured on subscribe because we need to - * ensure that late subscriptions that kicks onComplete have the right context. + * Propagates the context captured onSubscribe when the onComplete method is called. We do not let + * to propagate the active context if different from the context captured on subscribe because we + * need to ensure that late subscriptions that kicks onComplete have the right context. */ public static class SubscriberCompleteAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope before(@Advice.This final Subscriber self) { - final AgentSpan span = - InstrumentationContext.get(Subscriber.class, AgentSpan.class).get(self); - return span == null ? null : activateSpan(span); + public static ContextScope before(@Advice.This final Subscriber self) { + final Context context = InstrumentationContext.get(Subscriber.class, Context.class).get(self); + return context == null ? null : context.attach(); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void closeScope(@Advice.Enter final AgentScope scope) { + public static void closeScope(@Advice.Enter final ContextScope scope) { if (scope != null) { scope.close(); } From 8bc92c97fe88f7a861cdab72a798d52f30c4b11b Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Fri, 30 Jan 2026 12:07:15 +0100 Subject: [PATCH 2/3] Port reactor-core to context tracking --- .../BlockingPublisherInstrumentation.java | 40 ++++---------- .../reactor/core/ContextSpanHelper.java | 15 +++--- .../core/CorePublisherInstrumentation.java | 54 ++++++------------- .../core/CoreSubscriberInstrumentation.java | 32 ++++------- .../OptimizableOperatorInstrumentation.java | 33 +++--------- ...umentation.java => ReactorCoreModule.java} | 34 +++++++++++- 6 files changed, 81 insertions(+), 127 deletions(-) rename dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/{BlockScopePropagationInstrumentation.java => ReactorCoreModule.java} (52%) diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java index a64f1310995..9dd5257e902 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java @@ -3,46 +3,24 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasSuperType; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static net.bytebuddy.matcher.ElementMatchers.isMethod; -import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentScope; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import java.util.Collections; -import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; /** * This instrumentation is responsible for capturing the right state when Mono or Flux block* * methods are called. This because the mechanism they handle this differs a bit of the standard - * {@link Publisher#subscribe(Subscriber)} + * {@link Publisher#subscribe} */ -@AutoService(InstrumenterModule.class) -public class BlockingPublisherInstrumentation extends InstrumenterModule.Tracing +public class BlockingPublisherInstrumentation implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { - public BlockingPublisherInstrumentation() { - super("reactor-core"); - } - - @Override - public String[] helperClassNames() { - return new String[] { - packageName + ".ReactorAsyncResultExtension", - }; - } - - @Override - public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Publisher", AgentSpan.class.getName()); - } @Override public String hierarchyMarkerType() { @@ -62,16 +40,16 @@ public void methodAdvice(MethodTransformer transformer) { public static class BlockingAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope before(@Advice.This final Publisher self) { - final AgentSpan span = InstrumentationContext.get(Publisher.class, AgentSpan.class).get(self); - if (span == null) { + public static ContextScope before(@Advice.This final Publisher self) { + final Context context = InstrumentationContext.get(Publisher.class, Context.class).get(self); + if (context == null) { return null; } - return activateSpan(span); + return context.attach(); } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void after(@Advice.Enter final AgentScope scope) { + public static void after(@Advice.Enter final ContextScope scope) { if (scope != null) { scope.close(); } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java index 4f7e075f49f..bf11f50b640 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java @@ -1,11 +1,10 @@ package datadog.trace.instrumentation.reactor.core; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.context.Context; import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan; import javax.annotation.Nullable; import reactor.core.CoreSubscriber; import reactor.core.publisher.Mono; -import reactor.util.context.Context; public class ContextSpanHelper { @@ -33,20 +32,20 @@ private static Class findMonoWithContextClass() { private ContextSpanHelper() {} @Nullable - public static AgentSpan extractSpanFromSubscriberContext(final CoreSubscriber subscriber) { + public static Context extractContextFromSubscriberContext(final CoreSubscriber subscriber) { if (MONO_WITH_CONTEXT_CLASS == null || !MONO_WITH_CONTEXT_CLASS.isInstance(subscriber)) { return null; } - Context context = null; + reactor.util.context.Context reactorContext = null; try { - context = subscriber.currentContext(); + reactorContext = subscriber.currentContext(); } catch (Throwable ignored) { } - if (context == null) { + if (reactorContext == null) { return null; } - if (context.hasKey(DD_SPAN_KEY)) { - Object maybeSpan = context.get(DD_SPAN_KEY); + if (reactorContext.hasKey(DD_SPAN_KEY)) { + Object maybeSpan = reactorContext.get(DD_SPAN_KEY); if (maybeSpan instanceof WithAgentSpan) { return ((WithAgentSpan) maybeSpan).asAgentSpan(); } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java index 8b2a7a02a9f..6e243debc2e 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java @@ -4,21 +4,16 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; -import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractSpanFromSubscriberContext; +import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractContextFromSubscriberContext; import static net.bytebuddy.matcher.ElementMatchers.isStatic; import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentScope; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import java.util.HashMap; -import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -26,12 +21,8 @@ import org.reactivestreams.Subscriber; import reactor.core.CoreSubscriber; -@AutoService(InstrumenterModule.class) -public class CorePublisherInstrumentation extends InstrumenterModule.Tracing +public class CorePublisherInstrumentation implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { - public CorePublisherInstrumentation() { - super("reactor-core"); - } @Override public String hierarchyMarkerType() { @@ -47,21 +38,6 @@ public ElementMatcher hierarchyMatcher() { "reactor.core.publisher.Mono", "reactor.core.publisher.Flux"))); // < 3.1.7 } - @Override - public Map contextStore() { - final Map ret = new HashMap<>(); - ret.put("org.reactivestreams.Subscriber", AgentSpan.class.getName()); - ret.put("org.reactivestreams.Publisher", AgentSpan.class.getName()); - return ret; - } - - @Override - public String[] helperClassNames() { - return new String[] { - packageName + ".ContextSpanHelper", - }; - } - @Override public void methodAdvice(MethodTransformer transformer) { transformer.applyAdvice( @@ -74,23 +50,25 @@ public void methodAdvice(MethodTransformer transformer) { public static class PropagateContextSpanOnSubscribe { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope before( + public static ContextScope before( @Advice.This Publisher self, @Advice.Argument(0) final CoreSubscriber subscriber) { - final AgentSpan span = extractSpanFromSubscriberContext(subscriber); + final Context context = extractContextFromSubscriberContext(subscriber); - if (span != null) { - // we force storing the span state linked to publisher and subscriber to the one explicitly - // present in the context so that, if PublisherInstrumentation is kicking in after this - // advice, it won't override that active span - InstrumentationContext.get(Publisher.class, AgentSpan.class).put(self, span); - InstrumentationContext.get(Subscriber.class, AgentSpan.class).put(subscriber, span); - return activateSpan(span); + if (context != null) { + // we force storing the context state linked to publisher and subscriber to the one + // explicitly + // present in the reactor context so that, if PublisherInstrumentation is kicking in after + // this + // advice, it won't override that active context + InstrumentationContext.get(Publisher.class, Context.class).put(self, context); + InstrumentationContext.get(Subscriber.class, Context.class).put(subscriber, context); + return context.attach(); } return null; } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void after(@Advice.Enter final AgentScope scope) { + public static void after(@Advice.Enter final ContextScope scope) { if (scope != null) { scope.close(); } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java index 269072b86de..75152611f80 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java @@ -3,25 +3,18 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; -import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractSpanFromSubscriberContext; +import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractContextFromSubscriberContext; -import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.bootstrap.instrumentation.api.AgentScope; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.core.CoreSubscriber; -@AutoService(InstrumenterModule.class) -public class CoreSubscriberInstrumentation extends InstrumenterModule.Tracing +public class CoreSubscriberInstrumentation implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { - public CoreSubscriberInstrumentation() { - super("reactor-core"); - } @Override public String hierarchyMarkerType() { @@ -33,13 +26,6 @@ public ElementMatcher hierarchyMatcher() { return implementsInterface(named(hierarchyMarkerType())); } - @Override - public String[] helperClassNames() { - return new String[] { - packageName + ".ContextSpanHelper", - }; - } - @Override public void methodAdvice(MethodTransformer transformer) { transformer.applyAdvice( @@ -49,16 +35,16 @@ public void methodAdvice(MethodTransformer transformer) { public static class PropagateSpanInScopeAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope before(@Advice.This final CoreSubscriber self) { - final AgentSpan span = extractSpanFromSubscriberContext(self); - if (span != null) { - return activateSpan(span); + public static ContextScope before(@Advice.This final CoreSubscriber self) { + final Context context = extractContextFromSubscriberContext(self); + if (context != null) { + return context.attach(); } return null; } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void after(@Advice.Enter final AgentScope scope) { + public static void after(@Advice.Enter final ContextScope scope) { if (scope != null) { scope.close(); } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java index c25724bf4ae..747216f2402 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java @@ -7,13 +7,9 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import com.google.auto.service.AutoService; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import java.util.HashMap; -import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -21,18 +17,13 @@ import org.reactivestreams.Subscriber; /** - * This instrumentation is responsible for transferring the {@link AgentSpan} when subscription + * This instrumentation is responsible for transferring the {@link Context} when subscription * optimization are made. In particular reactor's OptimizableOperators can do subscription via a * loop call instead of recursion. */ -@AutoService(InstrumenterModule.class) -public class OptimizableOperatorInstrumentation extends InstrumenterModule.Tracing +public class OptimizableOperatorInstrumentation implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { - public OptimizableOperatorInstrumentation() { - super("reactor-core"); - } - @Override public String hierarchyMarkerType() { return "reactor.core.publisher.OptimizableOperator"; @@ -43,14 +34,6 @@ public ElementMatcher hierarchyMatcher() { return implementsInterface(named(hierarchyMarkerType())); } - @Override - public Map contextStore() { - final Map ret = new HashMap<>(); - ret.put("org.reactivestreams.Subscriber", AgentSpan.class.getName()); - ret.put("org.reactivestreams.Publisher", AgentSpan.class.getName()); - return ret; - } - @Override public void methodAdvice(MethodTransformer transformer) { transformer.applyAdvice( @@ -70,12 +53,12 @@ public static void onSubscribe( if (s == null || arg == null) { return; } - AgentSpan span = InstrumentationContext.get(Publisher.class, AgentSpan.class).get(self); - if (span == null) { - span = InstrumentationContext.get(Subscriber.class, AgentSpan.class).get(arg); + Context context = InstrumentationContext.get(Publisher.class, Context.class).get(self); + if (context == null) { + context = InstrumentationContext.get(Subscriber.class, Context.class).get(arg); } - if (span != null) { - InstrumentationContext.get(Subscriber.class, AgentSpan.class).putIfAbsent(s, span); + if (context != null) { + InstrumentationContext.get(Subscriber.class, Context.class).putIfAbsent(s, context); } } } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockScopePropagationInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java similarity index 52% rename from dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockScopePropagationInstrumentation.java rename to dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java index 67fb0d1abf2..9073903a8c4 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockScopePropagationInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java @@ -1,21 +1,42 @@ package datadog.trace.instrumentation.reactor.core; +import static java.util.Arrays.asList; + import com.google.auto.service.AutoService; +import datadog.context.Context; import datadog.trace.agent.tooling.ExcludeFilterProvider; +import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; @AutoService(InstrumenterModule.class) -public class BlockScopePropagationInstrumentation extends InstrumenterModule.Tracing +public final class ReactorCoreModule extends InstrumenterModule.ContextTracking implements ExcludeFilterProvider { - public BlockScopePropagationInstrumentation() { + public ReactorCoreModule() { super("reactor-core"); } + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".ReactorAsyncResultExtension", packageName + ".ContextSpanHelper", + }; + } + + @Override + public Map contextStore() { + final Map store = new HashMap<>(); + store.put("org.reactivestreams.Subscriber", Context.class.getName()); + store.put("org.reactivestreams.Publisher", Context.class.getName()); + return store; + } + @Override public Map> excludedClasses() { return Collections.singletonMap( @@ -28,4 +49,13 @@ public BlockScopePropagationInstrumentation() { "reactor.core.publisher.WorkQueueProcessor$WorkQueueInner", "reactor.core.publisher.WorkQueueProcessor$WorkQueueInner$1")); } + + @Override + public List typeInstrumentations() { + return asList( + new BlockingPublisherInstrumentation(), + new CorePublisherInstrumentation(), + new CoreSubscriberInstrumentation(), + new OptimizableOperatorInstrumentation()); + } } From af772d518335e652fb827cbe740d7a2799ca58b2 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Fri, 30 Jan 2026 14:28:01 +0100 Subject: [PATCH 3/3] Update instrumentation context access of instrumentation coupled to reactivestreams --- .../resilience4j/CircuitBreakerOperatorInstrumentation.java | 5 +++-- .../resilience4j/FallbackOperatorInstrumentation.java | 4 ++-- .../resilience4j/Resilience4jReactorModule.java | 4 ++-- .../resilience4j/RetryOperatorInstrumentation.java | 4 ++-- .../springwebflux/server/DispatcherHandlerAdvice.java | 3 ++- .../server/DispatcherHandlerInstrumentation.java | 4 ++-- .../springwebflux/server/HandleResultAdvice.java | 3 ++- .../springwebflux/server/HandlerAdapterAdvice.java | 3 ++- .../springwebflux/server/HandlerAdapterInstrumentation.java | 4 ++-- 9 files changed, 19 insertions(+), 15 deletions(-) diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java index 915aa141061..8f9ac96a2b6 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java @@ -4,9 +4,9 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import net.bytebuddy.asm.Advice; import org.reactivestreams.Publisher; @@ -40,7 +40,8 @@ public static void after( result, CircuitBreakerDecorator.DECORATE, circuitBreaker, - InstrumentationContext.get(Publisher.class, AgentSpan.class)::put); + // this needs to be separated from tracing + InstrumentationContext.get(Publisher.class, Context.class)::put); } } } diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java index 8b8aaf09733..8a87951497a 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java @@ -5,9 +5,9 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import io.github.resilience4j.core.functions.CheckedSupplier; import java.util.function.Function; import net.bytebuddy.asm.Advice; @@ -40,7 +40,7 @@ public static void after( result = ReactorHelper.wrapFunction( - result, InstrumentationContext.get(Publisher.class, AgentSpan.class)::putIfAbsent); + result, InstrumentationContext.get(Publisher.class, Context.class)::putIfAbsent); } // 2.0.0+ diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java index 44ec64e72c1..f1d3266c1da 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java @@ -1,9 +1,9 @@ package datadog.trace.instrumentation.resilience4j; import com.google.auto.service.AutoService; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -33,7 +33,7 @@ public String[] helperClassNames() { @Override public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Publisher", AgentSpan.class.getName()); + return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); } @Override diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java index 9cd7c428235..d740571b9ae 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java @@ -4,9 +4,9 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import io.github.resilience4j.retry.Retry; import net.bytebuddy.asm.Advice; import org.reactivestreams.Publisher; @@ -39,7 +39,7 @@ public static void after( result, RetryDecorator.DECORATE, retry, - InstrumentationContext.get(Publisher.class, AgentSpan.class)::put); + InstrumentationContext.get(Publisher.class, Context.class)::put); } } } diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java index e821d45015a..f599e0321a1 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java @@ -6,6 +6,7 @@ import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE; import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DISPATCHER_HANDLE_HANDLER; +import datadog.context.Context; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -50,7 +51,7 @@ public static void methodExit( final AgentSpan span = scope.span(); final Consumer finisher = new AdviceUtils.MonoSpanFinisher(span); mono = mono.doOnError(finisher).doFinally(finisher); - InstrumentationContext.get(Publisher.class, AgentSpan.class).put(mono, span); + InstrumentationContext.get(Publisher.class, Context.class).put(mono, span); } scope.close(); // span finished in MonoSpanFinisher diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java index 0cdd73e7287..9488b2e6950 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java @@ -7,9 +7,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import java.util.Collections; import java.util.Map; @@ -24,7 +24,7 @@ public String instrumentedType() { @Override public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Publisher", AgentSpan.class.getName()); + return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); } @Override diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java index f013e8c1d98..52fe8d67558 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.springwebflux.server; +import datadog.context.Context; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import net.bytebuddy.asm.Advice; @@ -14,7 +15,7 @@ public static void methodExit( @Advice.Return(readOnly = false) Mono mono) { final AgentSpan span = exchange.getAttribute(AdviceUtils.SPAN_ATTRIBUTE); if (span != null && mono != null) { - InstrumentationContext.get(Publisher.class, AgentSpan.class).put(mono, span); + InstrumentationContext.get(Publisher.class, Context.class).put(mono, span); } } } diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java index ba7e3e2e446..592a594e624 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java @@ -5,6 +5,7 @@ import static datadog.trace.instrumentation.springwebflux.server.AdviceUtils.constructOperationName; import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE; +import datadog.context.Context; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -71,7 +72,7 @@ public static void methodExit( if (throwable != null) { DECORATE.onError(scope, throwable); } else if (mono != null) { - InstrumentationContext.get(Publisher.class, AgentSpan.class).put(mono, scope.span()); + InstrumentationContext.get(Publisher.class, Context.class).put(mono, scope.span()); } scope.close(); // span finished in SpanFinishingSubscriber diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java index 432b8fead16..f3196983e0a 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java @@ -9,9 +9,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; +import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import java.util.Collections; import java.util.Map; import net.bytebuddy.description.type.TypeDescription; @@ -33,7 +33,7 @@ public ElementMatcher hierarchyMatcher() { @Override public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Publisher", AgentSpan.class.getName()); + return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); } @Override