Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,31 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.HashMap;
import java.util.Map;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/**
* This instrumentation is responsible for capturing the span when {@link
* This instrumentation is responsible for capturing the context when {@link
* Publisher#subscribe(Subscriber)} is called. The state is then stored and will be used to
* eventually propagate on the downstream signals.
*/
@AutoService(InstrumenterModule.class)
public class PublisherInstrumentation extends InstrumenterModule.Tracing
public class PublisherInstrumentation
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {

public PublisherInstrumentation() {
super("reactive-streams", "reactive-streams-1");
}

@Override
public String hierarchyMarkerType() {
return "org.reactivestreams.Publisher";
Expand All @@ -48,24 +38,6 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named("org.reactivestreams.Publisher"));
}

@Override
public Map<String, String> contextStore() {
final Map<String, String> ret = new HashMap<>();
ret.put("org.reactivestreams.Subscriber", AgentSpan.class.getName());
ret.put("org.reactivestreams.Publisher", AgentSpan.class.getName());
return ret;
}

@Override
public String[] helperClassNames() {
return new String[] {
this.packageName + ".ReactiveStreamsAsyncResultExtension",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedPublisher",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscriber",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscription",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
Expand All @@ -79,27 +51,27 @@ public void methodAdvice(MethodTransformer transformer) {

public static class PublisherSubscribeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onSubscribe(
public static ContextScope onSubscribe(
@Advice.This final Publisher self, @Advice.Argument(value = 0) final Subscriber s) {

final AgentSpan span =
InstrumentationContext.get(Publisher.class, AgentSpan.class).remove(self);
final AgentSpan activeSpan = activeSpan();
if (s == null || (span == null && activeSpan == null)) {
final Context context =
InstrumentationContext.get(Publisher.class, Context.class).remove(self);
final Context activeContext = Java8BytecodeBridge.getCurrentContext();
if (s == null || (context == null && activeContext == null)) {
return null;
}
final AgentSpan current =
InstrumentationContext.get(Subscriber.class, AgentSpan.class)
.putIfAbsent(s, span != null ? span : activeSpan);
final Context current =
InstrumentationContext.get(Subscriber.class, Context.class)
.putIfAbsent(s, context != null ? context : activeContext);
if (current != null) {
return activateSpan(current);
return current.attach();
}

return null;
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void afterSubscribe(@Advice.Enter final AgentScope scope) {
public static void afterSubscribe(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package datadog.trace.instrumentation.reactivestreams;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@AutoService(InstrumenterModule.class)
public final class ReactiveStreamsModule extends InstrumenterModule.ContextTracking {
public ReactiveStreamsModule() {
super("reactive-streams", "reactive-streams-1");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ReactiveStreamsAsyncResultExtension",
packageName + ".ReactiveStreamsAsyncResultExtension$WrappedPublisher",
packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscriber",
packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscription",
};
}

@Override
public Map<String, String> contextStore() {
final Map<String, String> store = new HashMap<>();
store.put("org.reactivestreams.Subscriber", Context.class.getName());
store.put("org.reactivestreams.Publisher", Context.class.getName());
return store;
}

@Override
public List<Instrumenter> typeInstrumentations() {
return asList(new PublisherInstrumentation(), new SubscriberInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers;
import datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.Collections;
import java.util.Map;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -25,12 +20,8 @@
* This instrumentation is responsible for propagating the state on the downstream signals (onNext,
* onError, onComplete).
*/
@AutoService(InstrumenterModule.class)
public class SubscriberInstrumentation extends InstrumenterModule.Tracing
public class SubscriberInstrumentation
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {
public SubscriberInstrumentation() {
super("reactive-streams", "reactive-streams-1");
}

@Override
public void methodAdvice(MethodTransformer transformer) {
Expand All @@ -41,11 +32,6 @@ public void methodAdvice(MethodTransformer transformer) {
isMethod().and(named("onComplete")), getClass().getName() + "$SubscriberCompleteAdvice");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap("org.reactivestreams.Subscriber", AgentSpan.class.getName());
}

@Override
public String hierarchyMarkerType() {
return "org.reactivestreams.Subscriber";
Expand All @@ -60,42 +46,41 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
* This advice propagate the downstream signals onNext and onError. The context on reactor is
* propagating bottom up, but we can have processing pipelines that wants to propagate the state
* downstream (i.e. state coming from the source). For this reason we allow to let the active
* scope propagate downstream if any. If missing, we'll use the one captured on subscribe.
* context propagate downstream if any. If missing, we'll use the one captured on subscribe.
*/
public static class SubscriberDownStreamAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(@Advice.This final Subscriber self) {
if (activeSpan() != null) {
public static ContextScope before(@Advice.This final Subscriber self) {
final Context currentContext = Java8BytecodeBridge.getCurrentContext();
if (currentContext != null && currentContext != Java8BytecodeBridge.getRootContext()) {
return null;
}
final AgentSpan span =
InstrumentationContext.get(Subscriber.class, AgentSpan.class).get(self);
return span == null ? null : activateSpan(span);
final Context context = InstrumentationContext.get(Subscriber.class, Context.class).get(self);
return context == null ? null : context.attach();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final AgentScope scope) {
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}

/**
* Propagates the span captured onSubscribe when the onComplete method is called. We do not let to
* propagate the active span if different from the span captured on subscribe because we need to
* ensure that late subscriptions that kicks onComplete have the right context.
* Propagates the context captured onSubscribe when the onComplete method is called. We do not let
* to propagate the active context if different from the context captured on subscribe because we
* need to ensure that late subscriptions that kicks onComplete have the right context.
*/
public static class SubscriberCompleteAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(@Advice.This final Subscriber self) {
final AgentSpan span =
InstrumentationContext.get(Subscriber.class, AgentSpan.class).get(self);
return span == null ? null : activateSpan(span);
public static ContextScope before(@Advice.This final Subscriber self) {
final Context context = InstrumentationContext.get(Subscriber.class, Context.class).get(self);
return context == null ? null : context.attach();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final AgentScope scope) {
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,24 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasSuperType;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/**
* This instrumentation is responsible for capturing the right state when Mono or Flux block*
* methods are called. This because the mechanism they handle this differs a bit of the standard
* {@link Publisher#subscribe(Subscriber)}
* {@link Publisher#subscribe}
*/
@AutoService(InstrumenterModule.class)
public class BlockingPublisherInstrumentation extends InstrumenterModule.Tracing
public class BlockingPublisherInstrumentation
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {
public BlockingPublisherInstrumentation() {
super("reactor-core");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ReactorAsyncResultExtension",
};
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap("org.reactivestreams.Publisher", AgentSpan.class.getName());
}

@Override
public String hierarchyMarkerType() {
Expand All @@ -62,16 +40,16 @@ public void methodAdvice(MethodTransformer transformer) {

public static class BlockingAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(@Advice.This final Publisher self) {
final AgentSpan span = InstrumentationContext.get(Publisher.class, AgentSpan.class).get(self);
if (span == null) {
public static ContextScope before(@Advice.This final Publisher self) {
final Context context = InstrumentationContext.get(Publisher.class, Context.class).get(self);
if (context == null) {
return null;
}
return activateSpan(span);
return context.attach();
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void after(@Advice.Enter final AgentScope scope) {
public static void after(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package datadog.trace.instrumentation.reactor.core;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.context.Context;
import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan;
import javax.annotation.Nullable;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class ContextSpanHelper {

Expand Down Expand Up @@ -33,20 +32,20 @@ private static Class<?> findMonoWithContextClass() {
private ContextSpanHelper() {}

@Nullable
public static AgentSpan extractSpanFromSubscriberContext(final CoreSubscriber<?> subscriber) {
public static Context extractContextFromSubscriberContext(final CoreSubscriber<?> subscriber) {
if (MONO_WITH_CONTEXT_CLASS == null || !MONO_WITH_CONTEXT_CLASS.isInstance(subscriber)) {
return null;
}
Context context = null;
reactor.util.context.Context reactorContext = null;
try {
context = subscriber.currentContext();
reactorContext = subscriber.currentContext();
} catch (Throwable ignored) {
}
if (context == null) {
if (reactorContext == null) {
return null;
}
if (context.hasKey(DD_SPAN_KEY)) {
Object maybeSpan = context.get(DD_SPAN_KEY);
if (reactorContext.hasKey(DD_SPAN_KEY)) {
Object maybeSpan = reactorContext.get(DD_SPAN_KEY);
if (maybeSpan instanceof WithAgentSpan) {
return ((WithAgentSpan) maybeSpan).asAgentSpan();
}
Expand Down
Loading