diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 31fafa8f3..cc03521bc 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -5,9 +5,11 @@ import { XrayService } from "../xray-service"; import { AppSyncEventTraceExtractor, CustomTraceExtractor, + DurableExecutionEventTraceExtractor, EventBridgeEventTraceExtractor, EventBridgeSQSEventTraceExtractor, HTTPEventTraceExtractor, + isDurableExecutionEvent, KinesisEventTraceExtractor, LambdaContextTraceExtractor, SNSEventTraceExtractor, @@ -56,6 +58,9 @@ export class TraceContextExtractor { } } + // No stripping needed — trace context is stored in dedicated + // `_datadog_{N}` checkpoint operations. + if (spanContext === null) { this.stepFunctionContextService = StepFunctionContextService.instance(event); if (this.stepFunctionContextService?.context) { @@ -81,6 +86,9 @@ export class TraceContextExtractor { private getTraceEventExtractor(event: any): EventTraceExtractor | undefined { if (!event || typeof event !== "object") return; + // Check for durable execution event first (has DurableExecutionArn + CheckpointToken) + if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(this.tracerWrapper); + const headers = event.headers ?? event.multiValueHeaders; if (headers !== null && typeof headers === "object") { return new HTTPEventTraceExtractor(this.tracerWrapper, this.config.decodeAuthorizerContext); diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts new file mode 100644 index 000000000..8a7f8e6ed --- /dev/null +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -0,0 +1,65 @@ +import { DurableExecutionEventTraceExtractor } from "./durable-execution"; +import { TracerWrapper } from "../../tracer-wrapper"; + +function makeTracerWrapper(datadogOnlyReturn: any = null): TracerWrapper { + return { + extractDatadogOnly: jest.fn().mockReturnValue(datadogOnlyReturn), + } as unknown as TracerWrapper; +} + +describe("DurableExecutionEventTraceExtractor", () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("extracts checkpoint headers with the datadog-only propagator", () => { + const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; + + const checkpointHeaders = { + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "987654321012345678", + "x-datadog-sampling-priority": "1", + }; + + const event = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-1", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_datadog_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify(checkpointHeaders), + }, + }, + ], + }, + }; + + const sentinelContext = { sentinel: true }; + const tracerWrapper = makeTracerWrapper(sentinelContext); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + const context = extractor.extract(event); + + // Checkpoints are written by dd-trace-js in Datadog style only — extract + // must use the matching forced-datadog propagator, not the user-configured one. + expect(tracerWrapper.extractDatadogOnly).toHaveBeenCalledWith(checkpointHeaders); + expect(context).toBe(sentinelContext); + }); + + it("returns null when no checkpoint exists", () => { + const tracerWrapper = makeTracerWrapper(); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + + const context = extractor.extract({ + DurableExecutionArn: "arn:aws:lambda:us-east-2:123:function:demo", + CheckpointToken: "t-empty", + InitialExecutionState: { Operations: [] }, + }); + + expect(context).toBeNull(); + expect(tracerWrapper.extractDatadogOnly).not.toHaveBeenCalled(); + }); +}); diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts new file mode 100644 index 000000000..7e5326cf9 --- /dev/null +++ b/src/trace/context/extractors/durable-execution.ts @@ -0,0 +1,258 @@ +/** + * Durable Execution Trace Extractor — Checkpoint Approach + * + * Strategy: + * 1. Look for trace context in the latest `_datadog_{N}` checkpoint. + * 2. If no trace checkpoint exists, return null and let the default extraction + * path create the context. + * + * The extracted context becomes the parent of the `aws.lambda` span (and any + * downstream spans created by dd-trace-js, including `aws.durable.execute`). + * This integration no longer creates a separate root span — anchoring to the + * first `aws.durable.execute` span in dd-trace-js is the canonical entry point + * for a durable execution. + * + * The dd-trace-js plugin writes checkpoint headers in **Datadog style only** + * (regardless of `DD_TRACE_PROPAGATION_STYLE_INJECT`), so we extract them with + * a matching forced-datadog propagator via `TracerWrapper.extractDatadogOnly`. + */ + +import { logDebug } from "../../../utils"; +import { SpanContextWrapper } from "../../span-context-wrapper"; +import { TracerWrapper } from "../../tracer-wrapper"; +import { EventTraceExtractor } from "../extractor"; + +/** + * Interface for operation data in durable execution state + */ +export interface DurableExecutionOperation { + Id: string; + Status: string; + Type?: string; + Name?: string; + ExecutionDetails?: { + InputPayload?: string; + }; + StepDetails?: { + Result?: string; + Error?: unknown; + NextAttemptTimestamp?: string; + }; + Payload?: string; + CallbackDetails?: { + Result?: string; + CallbackId?: string; + Error?: unknown; + }; + StartedAt?: string; + StartTimestamp?: number; + CompletedAt?: string; +} + +/** + * Interface for initial execution state in durable execution events + */ +export interface InitialExecutionState { + Operations?: DurableExecutionOperation[]; + Status?: string; +} + +/** + * Interface for durable execution event + */ +export interface DurableExecutionEvent { + DurableExecutionArn?: string; + CheckpointToken?: string; + InitialExecutionState?: InitialExecutionState; + Input?: unknown; +} + +/** + * Check if event is a durable execution event + */ +export function isDurableExecutionEvent(event: unknown): event is DurableExecutionEvent { + if (!event || typeof event !== "object") { + return false; + } + + const maybeEvent = event as Record; + return Boolean(maybeEvent.DurableExecutionArn && maybeEvent.CheckpointToken); +} + +/** + * Check if this is a replay invocation (has previous operations) + */ +export function isDurableExecutionReplay(event: unknown): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + return Array.isArray(operations) && operations.length > 0; +} + +/** + * Get durable execution ARN from event + */ +export function getDurableExecutionArn(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.DurableExecutionArn; +} + +/** + * Get checkpoint token from event + */ +export function getCheckpointToken(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.CheckpointToken; +} + +// Terminal operation statuses that indicate an operation has completed +const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); + +const TRACE_CHECKPOINT_NAME_PREFIX = "_datadog_"; + +function parseTraceCheckpointNumber(name: unknown): number | null { + if (typeof name !== "string") return null; + + if (!name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) return null; + const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); + const n = Number.parseInt(suffix, 10); + if (Number.isNaN(n) || String(n) !== suffix) return null; + return n; +} + +/** + * Find the highest-numbered `_datadog_{N}` checkpoint in the event and return + * its parsed header dict. + * + * Each invocation that changes trace context saves a new checkpoint with N+1; + * the one with the highest N is the most recent. Headers are written by the + * dd-trace-js plugin via `tracer.inject(span, 'http_headers', headers)` so the + * payload is a standard HTTP-style header dict. + * + */ +function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record | null { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + let best: { number: number; op: DurableExecutionOperation } | null = null; + for (const op of operations) { + const n = parseTraceCheckpointNumber(op?.Name); + if (n === null) continue; + if (best === null || n > best.number) { + best = { number: n, op }; + } + } + if (best === null) return null; + + const raw = best.op.Payload ?? best.op.StepDetails?.Result; + if (!raw || typeof raw !== "string") return null; + try { + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === "object") { + return parsed as Record; + } + } catch (e) { + logDebug(`Failed to parse trace checkpoint payload: ${e}`); + } + return null; +} + + +export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { + constructor(private tracerWrapper: TracerWrapper) {} + + extract(event: unknown): SpanContextWrapper | null { + if (!isDurableExecutionEvent(event)) { + logDebug("Event is not a durable execution event"); + return null; + } + if (!event.DurableExecutionArn) { + logDebug("No DurableExecutionArn in event"); + return null; + } + + const checkpointHeaders = findLatestCheckpointHeaders(event); + if (checkpointHeaders) { + logDebug("Extracting trace context from durable checkpoint (datadog-only)"); + return this.tracerWrapper.extractDatadogOnly(checkpointHeaders); + } + + logDebug("No durable trace context found; deferring to default extraction"); + return null; + } +} + +/** + * Utility to check if a durable operation is a replay + * + * An operation is a replay if it exists in the initial execution state + * with a terminal status (SUCCEEDED, FAILED, CANCELLED, STOPPED, TIMED_OUT) + * + * @param event - Lambda event + * @param stepId - The step ID to check (may be hashed) + * @returns true if the operation is a replay + */ +export function isOperationReplay(event: unknown, stepId: string): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) { + return false; + } + + const operation = operations.find((op) => op.Id === stepId); + if (!operation) { + return false; + } + + return TERMINAL_STATUSES.has(operation.Status); +} + +/** + * Get the replay status of an operation + * + * @param event - Lambda event + * @param stepId - The step ID to check + * @returns Operation status if found, undefined otherwise + */ +export function getOperationStatus(event: unknown, stepId: string): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return undefined; + } + + const operation = operations.find((op) => op.Id === stepId); + return operation?.Status; +} + +/** + * Count the number of completed operations in the event + * + * @param event - Lambda event + * @returns Number of completed operations + */ +export function getCompletedOperationCount(event: unknown): number { + if (!isDurableExecutionEvent(event)) { + return 0; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return 0; + } + + return operations.filter((op) => op.Status === "SUCCEEDED" || op.Status === "FAILED").length; +} + diff --git a/src/trace/context/extractors/index.ts b/src/trace/context/extractors/index.ts index 6bd690713..49e097f0f 100644 --- a/src/trace/context/extractors/index.ts +++ b/src/trace/context/extractors/index.ts @@ -9,3 +9,4 @@ export { SNSSQSEventTraceExtractor } from "./sns-sqs"; export { StepFunctionEventTraceExtractor } from "./step-function"; export { LambdaContextTraceExtractor } from "./lambda-context"; export { CustomTraceExtractor } from "./custom"; +export { DurableExecutionEventTraceExtractor, isDurableExecutionEvent, isDurableExecutionReplay } from "./durable-execution"; diff --git a/src/trace/tracer-wrapper.ts b/src/trace/tracer-wrapper.ts index 1d02b5caa..c93232b65 100644 --- a/src/trace/tracer-wrapper.ts +++ b/src/trace/tracer-wrapper.ts @@ -27,6 +27,11 @@ export interface TraceOptions { // This lets a customer bring their own version of the tracer. export class TracerWrapper { private tracer: any; + // Cached propagator that extracts only Datadog-style headers (`x-datadog-*`) + // regardless of `DD_TRACE_PROPAGATION_STYLE_EXTRACT`. Built lazily on first + // datadog-only extract because the underlying tracer config is mutated + // during dd-trace init and only stable after the user's handler is invoked. + private datadogOnlyPropagator: any | null | undefined; constructor() { try { @@ -67,6 +72,68 @@ export class TracerWrapper { return spanContext; } + /** + * Extract a span context from a header dict using **only** the Datadog + * propagation style, ignoring `DD_TRACE_PROPAGATION_STYLE_EXTRACT`. + * + * Use this for carriers we know are written by Datadog code (e.g. the + * `_datadog_{N}` trace-context checkpoints written by the + * datadog-plugin-aws-durable-execution-sdk-js plugin). For carriers + * originating from arbitrary upstream services, use `extract` instead so + * the user's propagation-style configuration is honored. + */ + public extractDatadogOnly(headers: any): SpanContextWrapper | null { + if (!this.isTracerAvailable) { + return null; + } + + const propagator = this.getOrBuildDatadogOnlyPropagator(); + if (!propagator) { + // Fallback: if we couldn't reach into dd-trace internals to build a + // forced-datadog propagator, defer to the standard extract. This is + // strictly less correct (honors user style) but better than dropping + // the context entirely. + return this.extract(headers); + } + + try { + const extractedSpanContext = propagator.extract(headers); + if (!extractedSpanContext) return null; + return new SpanContextWrapper(extractedSpanContext, TraceSource.Event); + } catch (err) { + if (err instanceof Object || err instanceof Error) { + logDebug("Datadog-only extract failed", err); + } + return null; + } + } + + private getOrBuildDatadogOnlyPropagator(): any | null { + if (this.datadogOnlyPropagator !== undefined) return this.datadogOnlyPropagator; + this.datadogOnlyPropagator = null; + try { + const innerTracer = this.tracer?._tracer; + const config = innerTracer?._config; + const existing = innerTracer?._propagators?.text_map; + if (!config || !existing) return null; + // Shadow tracePropagationStyle.extract while inheriting every other + // field (baggage limits, x-datadog-tags length cap, etc.). + const shadow = Object.create(config); + shadow.tracePropagationStyle = { + ...config.tracePropagationStyle, + extract: ["datadog"], + }; + // Build a sibling TextMapPropagator via the live propagator's constructor + // so we don't have to bind to a dd-trace internal module path. + this.datadogOnlyPropagator = new existing.constructor(shadow); + } catch (err) { + if (err instanceof Object || err instanceof Error) { + logDebug("Failed to build datadog-only propagator", err); + } + } + return this.datadogOnlyPropagator; + } + public wrap any>(name: string, options: TraceOptions, fn: T) { if (!this.isTracerAvailable) { return fn;