From 4e1078cd077c296dcf48720520bcea0b9eb7a218 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 29 Apr 2026 19:29:39 -0400 Subject: [PATCH 01/15] initial working version --- src/trace/context/extractor.ts | 32 + .../extractors/durable-execution.spec.ts | 137 +++ .../context/extractors/durable-execution.ts | 890 ++++++++++++++++++ src/trace/context/extractors/index.ts | 1 + src/trace/listener.ts | 41 + 5 files changed, 1101 insertions(+) create mode 100644 src/trace/context/extractors/durable-execution.spec.ts create mode 100644 src/trace/context/extractors/durable-execution.ts diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 31fafa8f3..233887308 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -5,9 +5,11 @@ import { XrayService } from "../xray-service"; import { AppSyncEventTraceExtractor, CustomTraceExtractor, + DurableExecutionEventTraceExtractor, EventBridgeEventTraceExtractor, EventBridgeSQSEventTraceExtractor, HTTPEventTraceExtractor, + isDurableExecutionEvent, KinesisEventTraceExtractor, LambdaContextTraceExtractor, SNSEventTraceExtractor, @@ -44,6 +46,12 @@ export class TraceContextExtractor { async extract(event: any, context: Context): Promise { let spanContext: SpanContextWrapper | null = null; + const durableTraceDebugEnabled = (() => { + const value = process.env.DD_DURABLE_TRACE_DEBUG; + if (!value) return false; + const normalized = value.toLowerCase(); + return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; + })(); if (this.config.traceExtractor) { const customExtractor = new CustomTraceExtractor(this.config.traceExtractor); spanContext = await customExtractor.extract(event, context); @@ -53,9 +61,21 @@ export class TraceContextExtractor { const eventExtractor = this.getTraceEventExtractor(event); if (eventExtractor !== undefined) { spanContext = eventExtractor.extract(event); + if (durableTraceDebugEnabled && isDurableExecutionEvent(event)) { + console.log("[dd-lambda][durable-trace] Event extractor result", { + extractor: eventExtractor.constructor?.name, + extracted: spanContext ? { + traceId: spanContext.toTraceId(), + parentId: spanContext.toSpanId(), + sampleMode: spanContext.sampleMode(), + } : null, + }); + } } } + // No stripping needed — deterministic approach never modifies checkpoint payloads. + if (spanContext === null) { this.stepFunctionContextService = StepFunctionContextService.instance(event); if (this.stepFunctionContextService?.context) { @@ -67,6 +87,15 @@ export class TraceContextExtractor { if (spanContext === null) { const contextExtractor = new LambdaContextTraceExtractor(this.tracerWrapper); spanContext = contextExtractor.extract(context); + if (durableTraceDebugEnabled && isDurableExecutionEvent(event)) { + console.log("[dd-lambda][durable-trace] Falling back to Lambda context extraction", { + extracted: spanContext ? { + traceId: spanContext.toTraceId(), + parentId: spanContext.toSpanId(), + sampleMode: spanContext.sampleMode(), + } : null, + }); + } } if (spanContext !== null) { @@ -81,6 +110,9 @@ export class TraceContextExtractor { private getTraceEventExtractor(event: any): EventTraceExtractor | undefined { if (!event || typeof event !== "object") return; + // Check for durable execution event first (has DurableExecutionArn + CheckpointToken) + if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(); + const headers = event.headers ?? event.multiValueHeaders; if (headers !== null && typeof headers === "object") { return new HTTPEventTraceExtractor(this.tracerWrapper, this.config.decodeAuthorizerContext); diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts new file mode 100644 index 000000000..70b2adc2e --- /dev/null +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -0,0 +1,137 @@ +import { createHash } from "crypto"; +import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; + +jest.mock("dd-trace", () => ({ + startSpan: jest.fn(), +})); + +function deterministicRootSpanId(executionArn: string): string { + const hash = createHash("sha256").update(`durable-root:${executionArn}`).digest("hex"); + const masked = BigInt(`0x${hash}`) & 0x7fffffffffffffffn; + return masked === 0n ? "1" : masked.toString(10); +} + +describe("DurableExecutionEventTraceExtractor", () => { + const tracer = require("dd-trace"); + const startSpanMock = tracer.startSpan as jest.Mock; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("extracts a deterministic durable root span id from executionArn", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; + + const event = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-1", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_dd_trace_context_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "987654321012345678", + "x-datadog-sampling-priority": "1", + }), + }, + }, + ], + }, + }; + + const extractor = new DurableExecutionEventTraceExtractor(); + const context = extractor.extract(event); + + expect(context).not.toBeNull(); + expect(context?.toTraceId()).toBe("149750110124521191"); + expect(context?.toSpanId()).toBe("987654321012345678"); + }); + + it("creates durable root span only for first invocation", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/first"; + + const spanContext: any = { + _spanId: null, + _parentId: null, + toTraceId: () => "1111111111111111111", + toSpanId: () => "2222222222222222222", + }; + const span = { + context: () => spanContext, + finish: jest.fn(), + }; + startSpanMock.mockReturnValue(span); + + const firstInvocationEvent = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-first", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "RUNNING", + StartTimestamp: 1710000000000, + ExecutionDetails: { + InputPayload: JSON.stringify({ hello: "world" }), + }, + }, + ], + }, + }; + + const extractor = new DurableExecutionEventTraceExtractor(); + const extracted = extractor.extract(firstInvocationEvent); + + const root = createDurableExecutionRootSpan(firstInvocationEvent, extracted); + + expect(root).not.toBeNull(); + expect(startSpanMock).toHaveBeenCalledTimes(1); + expect(root?.span.context()._spanId.toString(10)).toBe("2222222222222222222"); + }); + + it("skips durable root span creation on replay invocations", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/replay"; + + const replayEvent = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-replay", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_dd_trace_context_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "538591322263933970", + "x-datadog-sampling-priority": "1", + }), + }, + }, + { + Id: "op-2", + Name: "callback_step_prepare", + Status: "SUCCEEDED", + }, + ], + }, + }; + + const extractor = new DurableExecutionEventTraceExtractor(); + const extracted = extractor.extract(replayEvent); + const root = createDurableExecutionRootSpan(replayEvent, extracted); + + expect(root).toBeNull(); + expect(startSpanMock).not.toHaveBeenCalled(); + }); +}); + diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts new file mode 100644 index 000000000..88b9f5bdf --- /dev/null +++ b/src/trace/context/extractors/durable-execution.ts @@ -0,0 +1,890 @@ +/** + * Durable Execution Trace Extractor — Deterministic Approach + * + * Generates deterministic trace context from AWS Lambda Durable Execution events + * using SHA-256 hashing of the execution ARN and operation identifiers. + * + * Strategy: + * 1. First, try to extract a real trace context from the original customer event + * (stored in Operations[0].ExecutionDetails.InputPayload). If found, the durable + * execution trace connects to the upstream caller's trace. + * 2. If no upstream context exists, fall back to deterministic hashing from the + * execution ARN, generating a full 128-bit trace ID (lower 64 bits + _dd.p.tid) + * for W3C/OpenTelemetry compatibility. + * + * Flow: + * 1. Every invocation receives the same DurableExecutionArn + * 2. The original customer event is stored in Operations[0].ExecutionDetails.InputPayload + * and is identical across all invocations — any upstream trace headers persist + * 3. trace_id = real upstream or hash("durable-trace:{arn}") lower 64 bits + * 4. _dd.p.tid = real upstream or hash("durable-trace:{arn}") upper 64 bits + * 5. parent_id = hash("durable-span:{arn}#{last_completed_op_id}") — links to previous invocation + * 6. Checkpoint payloads are never modified + */ + +import { createHash, randomBytes } from "crypto"; +import { logDebug } from "../../../utils"; +import { SpanContextWrapper } from "../../span-context-wrapper"; +import { SampleMode, TraceSource } from "../../trace-context-service"; +import { EventTraceExtractor } from "../extractor"; + +const DURABLE_TRACE_DEBUG_ENV = "DD_DURABLE_TRACE_DEBUG"; + +function durableTraceDebugEnabled(): boolean { + const value = process.env[DURABLE_TRACE_DEBUG_ENV]; + if (!value) return false; + const normalized = value.toLowerCase(); + return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; +} + +function durableTraceDebugLog(message: string, details?: Record): void { + if (!durableTraceDebugEnabled()) return; + if (details) { + console.log(`[dd-lambda][durable-trace] ${message}`, details); + return; + } + console.log(`[dd-lambda][durable-trace] ${message}`); +} + +function parseTraceparentHex( + traceparent: unknown, +): { traceIdHex: string; parentIdHex: string; lower64TraceIdDec: string; upper64TraceIdHex: string; parentIdDec: string } | null { + if (typeof traceparent !== "string") return null; + const parts = traceparent.split("-"); + if (parts.length !== 4) return null; + const [, traceIdHex, parentIdHex] = parts; + if (!/^[0-9a-f]{32}$/i.test(traceIdHex) || !/^[0-9a-f]{16}$/i.test(parentIdHex)) { + return null; + } + + const lower64TraceIdHex = traceIdHex.slice(16); + const upper64TraceIdHex = traceIdHex.slice(0, 16); + + try { + return { + traceIdHex, + parentIdHex, + lower64TraceIdDec: BigInt(`0x${lower64TraceIdHex}`).toString(10), + upper64TraceIdHex, + parentIdDec: BigInt(`0x${parentIdHex}`).toString(10), + }; + } catch { + return null; + } +} + +function normalizeParentIdToDecimal(parentId: unknown): string | null { + if (typeof parentId !== "string") return null; + const value = parentId.trim(); + if (!value) return null; + + if (/^[0-9]+$/.test(value)) { + return value; + } + + if (/^[0-9a-f]+$/i.test(value)) { + const hex = value.length > 16 ? value.slice(-16) : value; + try { + return BigInt(`0x${hex}`).toString(10); + } catch { + return null; + } + } + + return null; +} + +function normalizeTraceIdToDecimal( + traceId: unknown, +): { traceId: string | null; ptidFromTraceId?: string } { + if (typeof traceId !== "string") { + return { traceId: null }; + } + + const value = traceId.trim(); + if (!value) { + return { traceId: null }; + } + + if (/^[0-9]+$/.test(value)) { + return { traceId: value }; + } + + if (/^[0-9a-f]+$/i.test(value)) { + // If a 128-bit hex trace ID was accidentally put here, split it like traceparent: + // lower 64 bits for Datadog trace_id, upper 64 bits for _dd.p.tid. + if (value.length > 16) { + const upperHex = value.slice(-32, -16).padStart(16, "0"); + const lowerHex = value.slice(-16); + try { + return { + traceId: BigInt(`0x${lowerHex}`).toString(10), + ptidFromTraceId: upperHex.toLowerCase(), + }; + } catch { + return { traceId: null }; + } + } + + try { + return { + traceId: BigInt(`0x${value}`).toString(10), + }; + } catch { + return { traceId: null }; + } + } + + return { traceId: null }; +} + +/** + * Interface for operation data in durable execution state + */ +export interface DurableExecutionOperation { + Id: string; + Status: string; + Type?: string; + Name?: string; + ExecutionDetails?: { + InputPayload?: string; + }; + StepDetails?: { + Result?: string; + Error?: unknown; + NextAttemptTimestamp?: string; + }; + Payload?: string; + CallbackDetails?: { + Result?: string; + CallbackId?: string; + Error?: unknown; + }; + StartedAt?: string; + StartTimestamp?: number; + CompletedAt?: string; +} + +/** + * Interface for initial execution state in durable execution events + */ +export interface InitialExecutionState { + Operations?: DurableExecutionOperation[]; + Status?: string; +} + +/** + * Interface for durable execution event + */ +export interface DurableExecutionEvent { + DurableExecutionArn?: string; + CheckpointToken?: string; + InitialExecutionState?: InitialExecutionState; + Input?: unknown; +} + +/** + * Check if event is a durable execution event + */ +export function isDurableExecutionEvent(event: unknown): event is DurableExecutionEvent { + if (!event || typeof event !== "object") { + return false; + } + + const maybeEvent = event as Record; + return Boolean(maybeEvent.DurableExecutionArn && maybeEvent.CheckpointToken); +} + +/** + * Check if this is a replay invocation (has previous operations) + */ +export function isDurableExecutionReplay(event: unknown): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + return Array.isArray(operations) && operations.length > 0; +} + +/** + * Get durable execution ARN from event + */ +export function getDurableExecutionArn(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.DurableExecutionArn; +} + +/** + * Get checkpoint token from event + */ +export function getCheckpointToken(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.CheckpointToken; +} + +function generateRandomPositiveId(): string { + const bytes = randomBytes(8); + bytes[0] = bytes[0] & 0x7f; // keep it positive int64 + const value = bufferToBigInt(bytes); + return value === 0n ? "1" : value.toString(10); +} + +function generateRandomTraceId128(): { traceId: string; ptid: string } { + const bytes = randomBytes(16); + + // Upper 64 bits -> _dd.p.tid + const upperBytes = Buffer.from(bytes.subarray(0, 8)); + const upperValue = bufferToBigInt(upperBytes); + const ptid = (upperValue === 0n ? 1n : upperValue).toString(16).padStart(16, "0"); + + // Lower 64 bits -> Datadog trace_id (decimal) + const lowerBytes = Buffer.from(bytes.subarray(8, 16)); + lowerBytes[0] = lowerBytes[0] & 0x7f; // keep positive int64 + const lowerValue = bufferToBigInt(lowerBytes); + const traceId = lowerValue === 0n ? "1" : lowerValue.toString(10); + + return { traceId, ptid }; +} + +function bufferToBigInt(buf: Buffer): bigint { + let result = 0n; + for (let i = 0; i < buf.length; i++) { + result = (result << 8n) | BigInt(buf[i]); + } + return result; +} + +// Terminal operation statuses that indicate an operation has completed +const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); + +const TRACE_CHECKPOINT_NAME_PREFIX = "_dd_trace_context_"; + +function deterministicSha256Hash(s: string): string { + const hash = createHash("sha256").update(s).digest("hex"); + const fullBigInt = BigInt("0x" + hash); + const masked = fullBigInt & 0x7fffffffffffffffn; + return masked === 0n ? "1" : masked.toString(10); +} + +function getDurableExecutionRootSpanId(executionArn: string): string { + return deterministicSha256Hash(`durable-root:${executionArn}`); +} + +/** + * Find the highest-numbered `_dd_trace_context_{N}` checkpoint in the event. + * Each invocation that changes trace context saves a new checkpoint with + * N+1; the one with the highest N is the most recent. + */ +function findLatestTraceContextCheckpoint( + event: DurableExecutionEvent, +): { number: number; headers: Record } | null { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + let best: { number: number; op: DurableExecutionOperation } | null = null; + for (const op of operations) { + const name = op?.Name; + if (!name || !name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) continue; + const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); + const n = Number.parseInt(suffix, 10); + if (Number.isNaN(n) || String(n) !== suffix) continue; + if (best === null || n > best.number) { + best = { number: n, op }; + } + } + if (best === null) return null; + + const raw = best.op.Payload ?? best.op.StepDetails?.Result; + if (!raw || typeof raw !== "string") return null; + try { + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === "object") { + durableTraceDebugLog("Found latest trace-context checkpoint", { + checkpointName: best.op.Name, + checkpointNumber: best.number, + operationId: best.op.Id, + hasPayload: Boolean(best.op.Payload), + hasStepResult: Boolean(best.op.StepDetails?.Result), + }); + return { number: best.number, headers: parsed as Record }; + } + } catch (e) { + logDebug(`Failed to parse trace checkpoint payload: ${e}`); + durableTraceDebugLog("Failed to parse trace-context checkpoint payload", { + checkpointName: best.op.Name, + checkpointNumber: best.number, + operationId: best.op.Id, + parseError: e instanceof Error ? e.message : String(e), + }); + } + return null; +} + +/** + * Try to extract a real Datadog trace context from the original customer event + * stored inside the durable execution envelope. + * + * The original event is stored in Operations[0].ExecutionDetails.InputPayload. + * Since all invocations replay the same stored event, any trace headers injected + * by an upstream Datadog-traced service will be present on every invocation. + * + * Returns extracted context info or null. + */ +function extractUpstreamTraceContext( + event: DurableExecutionEvent, +): { traceId: string; parentId: string; samplingPriority: string; ptid: string } | null { + try { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + const firstOp = operations[0]; + const inputPayloadStr = firstOp.ExecutionDetails?.InputPayload; + if (!inputPayloadStr) return null; + + const customerEvent = JSON.parse(inputPayloadStr); + if (!customerEvent || typeof customerEvent !== "object") return null; + + // Try headers (API Gateway, ALB, Function URL) + const headers = customerEvent.headers; + if (headers && typeof headers === "object") { + const traceId = headers["x-datadog-trace-id"]; + const parentId = headers["x-datadog-parent-id"]; + if (traceId && parentId) { + const samplingPriority = headers["x-datadog-sampling-priority"] || "1"; + const tags = headers["x-datadog-tags"] || ""; + const ptid = parsePtid(tags); + logDebug(`Found upstream trace context in customer event headers`); + return { traceId, parentId, samplingPriority, ptid }; + } + } + + // Try _datadog field (direct invocation / Step Functions) + const ddData = customerEvent._datadog; + if (ddData && typeof ddData === "object") { + const traceId = ddData["x-datadog-trace-id"]; + const parentId = ddData["x-datadog-parent-id"]; + if (traceId && parentId) { + const samplingPriority = ddData["x-datadog-sampling-priority"] || "1"; + const tags = ddData["x-datadog-tags"] || ""; + const ptid = parsePtid(tags); + logDebug(`Found upstream trace context in customer event _datadog field`); + return { traceId, parentId, samplingPriority, ptid }; + } + } + } catch (e) { + logDebug(`Failed to extract upstream trace context from durable event: ${e}`); + } + + return null; +} + +/** + * Parse _dd.p.tid from x-datadog-tags string. + * Format: "_dd.p.tid=66bcb5eb00000000,_dd.p.dm=-0" + */ +function parsePtid(tags: string): string { + if (!tags) return ""; + for (const tag of tags.split(",")) { + if (tag.includes("_dd.p.tid=")) { + return tag.split("=")[1] || ""; + } + } + return ""; +} + +/** + * Durable Execution Trace Extractor — Deterministic Approach with W3C Support + * + * Strategy: + * 1. Try to extract real upstream trace context from customer event + * 2. Fall back to deterministic 128-bit trace ID from execution ARN + * + * In both cases: + * - parent_id links to the last completed operation for replay chaining + * - _dd.p.tid is set for full 128-bit W3C trace ID support + */ +export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { + extract(event: unknown): SpanContextWrapper | null { + if (!isDurableExecutionEvent(event)) { + logDebug("Event is not a durable execution event"); + return null; + } + + const executionArn = event.DurableExecutionArn; + if (!executionArn) { + logDebug("No DurableExecutionArn in event"); + return null; + } + + const operations = event.InitialExecutionState?.Operations; + durableTraceDebugLog("Durable invocation event received", { + executionArn, + checkpointToken: event.CheckpointToken, + operationCount: operations?.length ?? 0, + }); + if (operations?.length) { + const checkpointOperations = operations + .filter((op) => typeof op?.Name === "string" && op.Name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) + .map((op) => ({ + id: op.Id, + name: op.Name, + status: op.Status, + hasPayload: Boolean(op.Payload), + hasStepResult: Boolean(op.StepDetails?.Result), + })); + durableTraceDebugLog("Trace-context checkpoint operations present in event", { + checkpoints: checkpointOperations, + }); + } + + // --- Step 0: Prefer a previously-saved trace-context checkpoint --- + // If a previous invocation saved a `_dd_trace_context_{N}` checkpoint, use + // the one with the highest N — it reflects the latest trace-context state + // of the ongoing durable execution. Same scheme as dd-trace-py. + const latestCheckpoint = findLatestTraceContextCheckpoint(event); + const executionRootSpanId = getDurableExecutionRootSpanId(executionArn); + if (latestCheckpoint) { + logDebug( + `Using trace context from checkpoint _dd_trace_context_${latestCheckpoint.number}`, + ); + const traceIdStr = latestCheckpoint.headers["x-datadog-trace-id"]; + const parentIdStr = latestCheckpoint.headers["x-datadog-parent-id"]; + const samplingPriorityStr = latestCheckpoint.headers["x-datadog-sampling-priority"] || "1"; + const tagsStr = latestCheckpoint.headers["x-datadog-tags"] || ""; + let ptidFromTags = parsePtid(tagsStr); + let effectiveTraceId = traceIdStr; + let effectiveParentId = parentIdStr; + + if ((!effectiveTraceId || !effectiveParentId) && latestCheckpoint.headers.traceparent) { + const parsedTraceparent = parseTraceparentHex(latestCheckpoint.headers.traceparent); + if (parsedTraceparent) { + effectiveTraceId = effectiveTraceId || parsedTraceparent.lower64TraceIdDec; + effectiveParentId = effectiveParentId || parsedTraceparent.parentIdDec; + ptidFromTags = ptidFromTags || parsedTraceparent.upper64TraceIdHex; + durableTraceDebugLog("Derived Datadog IDs from traceparent in checkpoint", { + checkpointNumber: latestCheckpoint.number, + traceparent: latestCheckpoint.headers.traceparent, + derivedTraceId: effectiveTraceId, + derivedParentId: effectiveParentId, + derivedPtid: ptidFromTags, + }); + } + } + + const normalizedTraceId = normalizeTraceIdToDecimal(effectiveTraceId); + const normalizedParentId = normalizeParentIdToDecimal(effectiveParentId); + if (!ptidFromTags && normalizedTraceId.ptidFromTraceId) { + ptidFromTags = normalizedTraceId.ptidFromTraceId; + } + + durableTraceDebugLog("Normalized checkpoint IDs", { + checkpointNumber: latestCheckpoint.number, + rawTraceId: effectiveTraceId, + rawParentId: effectiveParentId, + normalizedTraceId: normalizedTraceId.traceId, + normalizedParentId, + executionRootSpanId, + ptid: ptidFromTags, + }); + + durableTraceDebugLog("Checkpoint headers selected for extraction", { + checkpointNumber: latestCheckpoint.number, + headerKeys: Object.keys(latestCheckpoint.headers), + traceId: normalizedTraceId.traceId, + parentId: normalizedParentId, + samplingPriority: samplingPriorityStr, + ptid: ptidFromTags, + }); + + if (normalizedTraceId.traceId) { + try { + const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); + const id = require("dd-trace/packages/dd-trace/src/id"); + + const ddSpanContext = new _DatadogSpanContext({ + traceId: id(normalizedTraceId.traceId, 10), + // Use deterministic root span id from executionArn so all invocations + // remain anchored to the same durable root regardless of checkpoint payload format. + spanId: id(executionRootSpanId, 10), + sampling: { priority: samplingPriorityStr }, + }); + + if (ptidFromTags) { + ddSpanContext._trace.tags["_dd.p.tid"] = ptidFromTags; + } + + durableTraceDebugLog("Activated trace context from checkpoint", { + checkpointNumber: latestCheckpoint.number, + activatedTraceId: normalizedTraceId.traceId, + activatedParentId: executionRootSpanId, + activatedPtid: ptidFromTags, + checkpointParentId: normalizedParentId, + }); + return new SpanContextWrapper(ddSpanContext, TraceSource.Event); + } catch (e) { + logDebug(`Failed to construct SpanContext from checkpoint: ${e}`); + durableTraceDebugLog("Failed to activate trace context from checkpoint", { + checkpointNumber: latestCheckpoint.number, + activationError: e instanceof Error ? e.message : String(e), + traceId: normalizedTraceId.traceId, + parentId: executionRootSpanId, + ptid: ptidFromTags, + }); + // Fall through to existing paths + } + } else { + durableTraceDebugLog("Checkpoint did not contain usable trace identifiers", { + checkpointNumber: latestCheckpoint.number, + hasTraceId: Boolean(normalizedTraceId.traceId), + hasParentId: Boolean(normalizedParentId || executionRootSpanId), + traceparent: latestCheckpoint.headers.traceparent, + }); + } + } else { + durableTraceDebugLog("No trace-context checkpoint found in event operations", { + executionArn, + operationCount: operations?.length ?? 0, + }); + } + + // --- Step 1: Try to use real upstream trace context --- + const upstream = extractUpstreamTraceContext(event); + + let traceId: string; + let ptid: string; + const rootSpanId = executionRootSpanId; + let samplingPriority: string; + + if (upstream) { + const normalizedUpstreamTrace = normalizeTraceIdToDecimal(upstream.traceId); + const normalizedTraceId = normalizedUpstreamTrace.traceId; + + if (normalizedTraceId) { + traceId = normalizedTraceId; + ptid = upstream.ptid || normalizedUpstreamTrace.ptidFromTraceId || ""; + samplingPriority = upstream.samplingPriority; + logDebug(`Using upstream trace_id=${traceId}, _dd.p.tid=${ptid}`); + } else { + const randomTrace = generateRandomTraceId128(); + traceId = randomTrace.traceId; + ptid = randomTrace.ptid; + samplingPriority = SampleMode.AUTO_KEEP.toString(); + logDebug(`Upstream trace_id invalid, generated new trace_id=${traceId}, _dd.p.tid=${ptid}`); + } + + // For the first invocation (no checkpoint), use the deterministic durable + // root span id derived from executionArn. + durableTraceDebugLog("No checkpoint found; generated durable root context from upstream trace", { + traceId, + rootSpanId, + upstreamParentId: upstream.parentId, + samplingPriority, + ptid, + }); + } else { + // --- Step 2: No checkpoint and no upstream context --- + // Start a new trace and use deterministic root span id that will be + // referenced by checkpoints in later invocations. + const randomTrace = generateRandomTraceId128(); + traceId = randomTrace.traceId; + ptid = randomTrace.ptid; + samplingPriority = SampleMode.AUTO_KEEP.toString(); + + logDebug(`No upstream context, generated trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); + durableTraceDebugLog("No checkpoint/upstream found; generated new durable trace context", { + executionArn, + traceId, + rootSpanId, + ptid, + }); + } + + logDebug(`Generated initial durable root context: trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); + + // Construct span context with _dd.p.tid for 128-bit W3C trace ID support + // Similar to Step Functions' approach in step-function-service.ts + try { + const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); + const id = require("dd-trace/packages/dd-trace/src/id"); + + const ddSpanContext = new _DatadogSpanContext({ + traceId: id(traceId, 10), + spanId: id(rootSpanId, 10), + sampling: { priority: samplingPriority }, + }); + + // Set _dd.p.tid for upper 64 bits of 128-bit trace ID + if (ptid) { + ddSpanContext._trace.tags["_dd.p.tid"] = ptid; + } + + return new SpanContextWrapper(ddSpanContext, TraceSource.Event); + } catch (error) { + if (error instanceof Error) { + logDebug("Couldn't generate SpanContext with tracer, falling back.", error); + } + } + + // Fallback without _dd.p.tid if dd-trace is not available + return SpanContextWrapper.fromTraceContext({ + traceId, + parentId: rootSpanId, + sampleMode: parseInt(samplingPriority, 10), + source: TraceSource.Event, + }); + } +} + +/** + * Utility to check if a durable operation is a replay + * + * An operation is a replay if it exists in the initial execution state + * with a terminal status (SUCCEEDED, FAILED, CANCELLED, STOPPED, TIMED_OUT) + * + * @param event - Lambda event + * @param stepId - The step ID to check (may be hashed) + * @returns true if the operation is a replay + */ +export function isOperationReplay(event: unknown, stepId: string): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) { + return false; + } + + const operation = operations.find((op) => op.Id === stepId); + if (!operation) { + return false; + } + + return TERMINAL_STATUSES.has(operation.Status); +} + +/** + * Get the replay status of an operation + * + * @param event - Lambda event + * @param stepId - The step ID to check + * @returns Operation status if found, undefined otherwise + */ +export function getOperationStatus(event: unknown, stepId: string): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return undefined; + } + + const operation = operations.find((op) => op.Id === stepId); + return operation?.Status; +} + +/** + * Count the number of completed operations in the event + * + * @param event - Lambda event + * @returns Number of completed operations + */ +export function getCompletedOperationCount(event: unknown): number { + if (!isDurableExecutionEvent(event)) { + return 0; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return 0; + } + + return operations.filter((op) => + op.Status === "SUCCEEDED" || op.Status === "FAILED" + ).length; +} + +/** + * Create (or re-emit) the durable execution root span. + * + * Every invocation emits this span with the same propagated root span_id. + * The Datadog backend deduplicates by span_id, so the last invocation's + * version wins with the correct total duration (start → final end). + * + * Returns an object with { span, finish() } or null if not a durable execution. + * Caller must call finish() when the invocation ends. + */ +export function createDurableExecutionRootSpan( + event: unknown, + extractedRootContext?: SpanContextWrapper | null, +): { span: any; finish: () => void } | null { + if (!isDurableExecutionEvent(event)) { + return null; + } + + const executionArn = event.DurableExecutionArn; + if (!executionArn) { + return null; + } + + const operations = event.InitialExecutionState?.Operations; + const hasCheckpoint = Boolean( + operations?.some((op) => typeof op?.Name === "string" && op.Name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)), + ); + const hasCompletedOperation = Boolean(operations?.some((op) => TERMINAL_STATUSES.has(op.Status))); + const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; + + if (!isLikelyFirstInvocation) { + durableTraceDebugLog("Skipping durable root span creation for replay invocation", { + executionArn, + operationCount: operations?.length ?? 0, + hasCheckpoint, + hasCompletedOperation, + }); + return null; + } + + const rootSpanId = getDurableExecutionRootSpanId(executionArn); + const extractedTraceId = extractedRootContext?.toTraceId(); + const extractedSpanId = extractedRootContext?.toSpanId(); + durableTraceDebugLog("Preparing durable root span creation", { + executionArn, + extractedTraceId, + extractedSpanId, + expectedRootSpanId: rootSpanId, + operationCount: operations?.length ?? 0, + }); + + // Determine consistent start_time from the first operation's StartTimestamp + // StartTimestamp is unix milliseconds from the durable execution SDK + let startTime: number | undefined; + const replayOperations = event.InitialExecutionState?.Operations; + if (replayOperations && replayOperations.length > 0) { + const firstStartTs = replayOperations[0].StartTimestamp; + if (firstStartTs != null) { + const parsed = Number(firstStartTs); + if (!isNaN(parsed)) { + startTime = parsed; // already in millis, dd-trace startSpan expects millis + } + } + } + + try { + const tracer = require("dd-trace"); + const id = require("dd-trace/packages/dd-trace/src/id"); + + const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; + const resourceName = executionArn.includes(":") ? executionArn.split(":").pop() : executionArn; + + const spanOptions: Record = { + type: "serverless", + tags: { + "service.name": serviceName, + "resource.name": resourceName, + "durable.execution_arn": executionArn, + "durable.is_root_span": true, + "durable.invocation_count": replayOperations?.length ?? 0, + }, + }; + + if (startTime !== undefined) { + spanOptions.startTime = startTime; + } + if (extractedRootContext?.spanContext) { + // Ensure the durable root span stays in the same trace as the extracted + // durable invocation context even when there is no active scope. + spanOptions.childOf = extractedRootContext.spanContext; + } + + let activeBefore: any; + try { + activeBefore = tracer?.scope?.().active?.(); + } catch { + activeBefore = undefined; + } + durableTraceDebugLog("Creating durable root span", { + executionArn, + startTime, + expectedRootSpanId: rootSpanId, + activeScopeBefore: activeBefore ? { + traceId: activeBefore.context?.()?.toTraceId?.(), + spanId: activeBefore.context?.()?.toSpanId?.(), + } : null, + }); + + const span = tracer.startSpan("aws.durable-execution", spanOptions); + + // Re-emit root span using the extracted root span_id so all invocations + // refer to the same durable execution root (propagated by checkpoints). + try { + if (rootSpanId) { + span.context()._spanId = id(rootSpanId, 10); + } + } catch (e) { + logDebug(`Failed to set durable root span_id: ${e}`); + } + + // Fix parent_id: the active context has span_id=root_span_id (set by + // DurableExecutionEventTraceExtractor.extract), so tracer.startSpan() + // inherits that as parent_id, causing self-parenting. The root span's + // parent should be the upstream caller (if extracted) or 0 (true root). + try { + const upstream = extractUpstreamTraceContext(event as DurableExecutionEvent); + if (upstream) { + span.context()._parentId = id(upstream.parentId, 10); + } else { + span.context()._parentId = id("0", 10); + } + } catch (e) { + logDebug(`Failed to set root span parent_id: ${e}`); + } + + const createdTraceId = span.context().toTraceId?.(); + const createdSpanId = span.context().toSpanId?.(); + const createdParentId = span.context()._parentId?.toString?.(10) ?? span.context()._parentId?.toString?.(); + durableTraceDebugLog("Durable root span created", { + executionArn, + createdTraceId, + createdSpanId, + createdParentId, + expectedRootSpanId: rootSpanId, + extractedTraceId, + extractedSpanId, + traceMatchesExtracted: extractedTraceId ? createdTraceId === extractedTraceId : undefined, + spanMatchesExpectedRoot: createdSpanId === rootSpanId, + }); + if (extractedTraceId && createdTraceId && extractedTraceId !== createdTraceId) { + durableTraceDebugLog("WARNING: durable root span trace differs from extracted durable context", { + executionArn, + extractedTraceId, + createdTraceId, + note: "This can cause the durable root span to appear standalone.", + }); + } + + logDebug(`Created root execution span: span_id=${rootSpanId ?? "auto"}, start_time=${startTime}`); + + return { + span, + finish: () => { + durableTraceDebugLog("Finishing durable root span", { + executionArn, + traceId: span.context().toTraceId?.(), + spanId: span.context().toSpanId?.(), + parentId: span.context()._parentId?.toString?.(10) ?? span.context()._parentId?.toString?.(), + }); + span.finish(); + logDebug("Finished root execution span"); + }, + }; + } catch (e) { + logDebug(`Failed to create durable execution root span: ${e}`); + return null; + } +} diff --git a/src/trace/context/extractors/index.ts b/src/trace/context/extractors/index.ts index 6bd690713..0fbbf081b 100644 --- a/src/trace/context/extractors/index.ts +++ b/src/trace/context/extractors/index.ts @@ -9,3 +9,4 @@ export { SNSSQSEventTraceExtractor } from "./sns-sqs"; export { StepFunctionEventTraceExtractor } from "./step-function"; export { LambdaContextTraceExtractor } from "./lambda-context"; export { CustomTraceExtractor } from "./custom"; +export { DurableExecutionEventTraceExtractor, isDurableExecutionEvent, isDurableExecutionReplay, createDurableExecutionRootSpan } from "./durable-execution"; diff --git a/src/trace/listener.ts b/src/trace/listener.ts index f068c5352..628f8ec42 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -27,6 +27,7 @@ import { } from "./durable-function-context"; import { XrayService } from "./xray-service"; import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http"; +import { createDurableExecutionRootSpan } from "./context/extractors/durable-execution"; import { getSpanPointerAttributes, SpanPointerAttributes } from "../utils/span-pointers"; export type TraceExtractor = (event: any, context: Context) => Promise | TraceContext; @@ -102,6 +103,7 @@ export class TraceListener { private wrappedCurrentSpan?: SpanWrapper; private triggerTags?: { [key: string]: string }; private lambdaSpanParentContext?: SpanContext; + private durableRootSpan?: { span: any; finish: () => void }; private spanPointerAttributesList: SpanPointerAttributes[] | undefined; public get currentTraceHeaders() { @@ -115,6 +117,12 @@ export class TraceListener { } public async onStartInvocation(event: any, context: Context) { + const durableTraceDebugEnabled = (() => { + const value = process.env.DD_DURABLE_TRACE_DEBUG; + if (!value) return false; + const normalized = value.toLowerCase(); + return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; + })(); const tracerInitialized = this.tracerWrapper.isTracerAvailable; if (this.config.injectLogContext) { patchConsole(console, this.contextService); @@ -143,6 +151,32 @@ export class TraceListener { traceSource: this.contextService.traceSource, }); } + // Create the durable execution root span before everything else. + // This span uses the propagated root span_id and is re-emitted on every + // invocation (last one wins in the backend with correct total duration). + if (durableTraceDebugEnabled && spanContextWrapper) { + console.log("[dd-lambda][durable-trace] Listener root-span inputs", { + traceSource: this.contextService.traceSource, + extracted: { + traceId: spanContextWrapper.toTraceId(), + parentId: spanContextWrapper.toSpanId(), + sampleMode: spanContextWrapper.sampleMode(), + }, + }); + } + this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; + if (durableTraceDebugEnabled) { + const durableRootSpanContext = this.durableRootSpan?.span?.context?.(); + console.log("[dd-lambda][durable-trace] Listener root-span creation result", { + created: Boolean(this.durableRootSpan), + rootSpan: durableRootSpanContext ? { + traceId: durableRootSpanContext.toTraceId?.(), + spanId: durableRootSpanContext.toSpanId?.(), + parentId: durableRootSpanContext._parentId?.toString?.(10) ?? durableRootSpanContext._parentId?.toString?.(), + } : null, + }); + } + if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( event, @@ -309,6 +343,13 @@ export class TraceListener { this.injectAuthorizerSpan(result, event?.requestContext?.requestId, finishTime || Date.now()); } + // Finish the durable execution root span after all other spans. + // Re-emitted every invocation; the last one wins in the backend. + if (this.durableRootSpan) { + this.durableRootSpan.finish(); + this.durableRootSpan = undefined; + } + // Reset singletons and trace context this.stepFunctionContext = undefined; this.durableFunctionContext = undefined; From 965710ebb023393c7525e9a133880d5d40e68d86 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 29 Apr 2026 23:53:14 -0400 Subject: [PATCH 02/15] some updates and fixes --- src/trace/context/extractor.ts | 28 +--- .../extractors/durable-execution.spec.ts | 14 +- .../context/extractors/durable-execution.ts | 125 +++++++++--------- 3 files changed, 66 insertions(+), 101 deletions(-) diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 233887308..34afff0f6 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -46,12 +46,6 @@ export class TraceContextExtractor { async extract(event: any, context: Context): Promise { let spanContext: SpanContextWrapper | null = null; - const durableTraceDebugEnabled = (() => { - const value = process.env.DD_DURABLE_TRACE_DEBUG; - if (!value) return false; - const normalized = value.toLowerCase(); - return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; - })(); if (this.config.traceExtractor) { const customExtractor = new CustomTraceExtractor(this.config.traceExtractor); spanContext = await customExtractor.extract(event, context); @@ -61,20 +55,11 @@ export class TraceContextExtractor { const eventExtractor = this.getTraceEventExtractor(event); if (eventExtractor !== undefined) { spanContext = eventExtractor.extract(event); - if (durableTraceDebugEnabled && isDurableExecutionEvent(event)) { - console.log("[dd-lambda][durable-trace] Event extractor result", { - extractor: eventExtractor.constructor?.name, - extracted: spanContext ? { - traceId: spanContext.toTraceId(), - parentId: spanContext.toSpanId(), - sampleMode: spanContext.sampleMode(), - } : null, - }); - } } } - // No stripping needed — deterministic approach never modifies checkpoint payloads. + // No stripping needed — trace context is stored in dedicated + // `_datadog_{N}` checkpoint operations. if (spanContext === null) { this.stepFunctionContextService = StepFunctionContextService.instance(event); @@ -87,15 +72,6 @@ export class TraceContextExtractor { if (spanContext === null) { const contextExtractor = new LambdaContextTraceExtractor(this.tracerWrapper); spanContext = contextExtractor.extract(context); - if (durableTraceDebugEnabled && isDurableExecutionEvent(event)) { - console.log("[dd-lambda][durable-trace] Falling back to Lambda context extraction", { - extracted: spanContext ? { - traceId: spanContext.toTraceId(), - parentId: spanContext.toSpanId(), - sampleMode: spanContext.sampleMode(), - } : null, - }); - } } if (spanContext !== null) { diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index 70b2adc2e..864a35e9c 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -1,16 +1,9 @@ -import { createHash } from "crypto"; import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; jest.mock("dd-trace", () => ({ startSpan: jest.fn(), })); -function deterministicRootSpanId(executionArn: string): string { - const hash = createHash("sha256").update(`durable-root:${executionArn}`).digest("hex"); - const masked = BigInt(`0x${hash}`) & 0x7fffffffffffffffn; - return masked === 0n ? "1" : masked.toString(10); -} - describe("DurableExecutionEventTraceExtractor", () => { const tracer = require("dd-trace"); const startSpanMock = tracer.startSpan as jest.Mock; @@ -19,7 +12,7 @@ describe("DurableExecutionEventTraceExtractor", () => { jest.clearAllMocks(); }); - it("extracts a deterministic durable root span id from executionArn", () => { + it("extracts trace context from the latest trace checkpoint", () => { const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; @@ -30,7 +23,7 @@ describe("DurableExecutionEventTraceExtractor", () => { Operations: [ { Id: "op-1", - Name: "_dd_trace_context_0", + Name: "_datadog_0", Status: "SUCCEEDED", StepDetails: { Result: JSON.stringify({ @@ -93,7 +86,7 @@ describe("DurableExecutionEventTraceExtractor", () => { expect(root).not.toBeNull(); expect(startSpanMock).toHaveBeenCalledTimes(1); - expect(root?.span.context()._spanId.toString(10)).toBe("2222222222222222222"); + expect(root?.span.context()._spanId.toString(10)).toBe(extracted?.toSpanId()); }); it("skips durable root span creation on replay invocations", () => { @@ -134,4 +127,3 @@ describe("DurableExecutionEventTraceExtractor", () => { expect(startSpanMock).not.toHaveBeenCalled(); }); }); - diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index 88b9f5bdf..c2976b6ee 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -1,28 +1,14 @@ /** - * Durable Execution Trace Extractor — Deterministic Approach - * - * Generates deterministic trace context from AWS Lambda Durable Execution events - * using SHA-256 hashing of the execution ARN and operation identifiers. + * Durable Execution Trace Extractor — Checkpoint/Upstream Approach * * Strategy: - * 1. First, try to extract a real trace context from the original customer event - * (stored in Operations[0].ExecutionDetails.InputPayload). If found, the durable - * execution trace connects to the upstream caller's trace. - * 2. If no upstream context exists, fall back to deterministic hashing from the - * execution ARN, generating a full 128-bit trace ID (lower 64 bits + _dd.p.tid) - * for W3C/OpenTelemetry compatibility. - * - * Flow: - * 1. Every invocation receives the same DurableExecutionArn - * 2. The original customer event is stored in Operations[0].ExecutionDetails.InputPayload - * and is identical across all invocations — any upstream trace headers persist - * 3. trace_id = real upstream or hash("durable-trace:{arn}") lower 64 bits - * 4. _dd.p.tid = real upstream or hash("durable-trace:{arn}") upper 64 bits - * 5. parent_id = hash("durable-span:{arn}#{last_completed_op_id}") — links to previous invocation - * 6. Checkpoint payloads are never modified + * 1. Prefer trace context from the latest `_datadog_{N}` checkpoint. + * 2. If no trace checkpoint exists (first invocation), try upstream trace context + * from the original customer event stored in `Operations[0].ExecutionDetails.InputPayload`. + * 3. If neither exists, start a new trace with random IDs. */ -import { createHash, randomBytes } from "crypto"; +import { randomBytes } from "crypto"; import { logDebug } from "../../../utils"; import { SpanContextWrapper } from "../../span-context-wrapper"; import { SampleMode, TraceSource } from "../../trace-context-service"; @@ -262,37 +248,45 @@ function bufferToBigInt(buf: Buffer): bigint { // Terminal operation statuses that indicate an operation has completed const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); -const TRACE_CHECKPOINT_NAME_PREFIX = "_dd_trace_context_"; +const TRACE_CHECKPOINT_NAME_PREFIX = "_datadog_"; +const LEGACY_TRACE_CHECKPOINT_NAME_PREFIX = "_dd_trace_context_"; +const TRACE_CHECKPOINT_NAME_PREFIXES = [ + TRACE_CHECKPOINT_NAME_PREFIX, + LEGACY_TRACE_CHECKPOINT_NAME_PREFIX, +]; + +function parseTraceCheckpointNumber(name: unknown): number | null { + if (typeof name !== "string") return null; -function deterministicSha256Hash(s: string): string { - const hash = createHash("sha256").update(s).digest("hex"); - const fullBigInt = BigInt("0x" + hash); - const masked = fullBigInt & 0x7fffffffffffffffn; - return masked === 0n ? "1" : masked.toString(10); + const prefix = TRACE_CHECKPOINT_NAME_PREFIXES.find((candidate) => name.startsWith(candidate)); + if (!prefix) return null; + + const suffix = name.slice(prefix.length); + const n = Number.parseInt(suffix, 10); + if (Number.isNaN(n) || String(n) !== suffix) return null; + return n; } -function getDurableExecutionRootSpanId(executionArn: string): string { - return deterministicSha256Hash(`durable-root:${executionArn}`); +function isTraceCheckpointName(name: unknown): boolean { + return parseTraceCheckpointNumber(name) !== null; } /** - * Find the highest-numbered `_dd_trace_context_{N}` checkpoint in the event. + * Find the highest-numbered `_datadog_{N}` checkpoint in the event. + * Also supports legacy `_dd_trace_context_{N}` checkpoints for compatibility. * Each invocation that changes trace context saves a new checkpoint with * N+1; the one with the highest N is the most recent. */ function findLatestTraceContextCheckpoint( event: DurableExecutionEvent, -): { number: number; headers: Record } | null { +): { number: number; name: string; headers: Record } | null { const operations = event.InitialExecutionState?.Operations; if (!operations || operations.length === 0) return null; let best: { number: number; op: DurableExecutionOperation } | null = null; for (const op of operations) { - const name = op?.Name; - if (!name || !name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) continue; - const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); - const n = Number.parseInt(suffix, 10); - if (Number.isNaN(n) || String(n) !== suffix) continue; + const n = parseTraceCheckpointNumber(op?.Name); + if (n === null) continue; if (best === null || n > best.number) { best = { number: n, op }; } @@ -311,7 +305,11 @@ function findLatestTraceContextCheckpoint( hasPayload: Boolean(best.op.Payload), hasStepResult: Boolean(best.op.StepDetails?.Result), }); - return { number: best.number, headers: parsed as Record }; + return { + number: best.number, + name: String(best.op.Name), + headers: parsed as Record, + }; } } catch (e) { logDebug(`Failed to parse trace checkpoint payload: ${e}`); @@ -398,15 +396,12 @@ function parsePtid(tags: string): string { } /** - * Durable Execution Trace Extractor — Deterministic Approach with W3C Support + * Durable Execution Trace Extractor * * Strategy: - * 1. Try to extract real upstream trace context from customer event - * 2. Fall back to deterministic 128-bit trace ID from execution ARN - * - * In both cases: - * - parent_id links to the last completed operation for replay chaining - * - _dd.p.tid is set for full 128-bit W3C trace ID support + * 1. Prefer `_datadog_{N}` checkpoint context when present. + * 2. Otherwise, derive trace linkage from upstream customer event context. + * 3. If none exists, start a new random trace context. */ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { extract(event: unknown): SpanContextWrapper | null { @@ -429,7 +424,7 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor }); if (operations?.length) { const checkpointOperations = operations - .filter((op) => typeof op?.Name === "string" && op.Name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) + .filter((op) => isTraceCheckpointName(op?.Name)) .map((op) => ({ id: op.Id, name: op.Name, @@ -443,14 +438,13 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor } // --- Step 0: Prefer a previously-saved trace-context checkpoint --- - // If a previous invocation saved a `_dd_trace_context_{N}` checkpoint, use + // If a previous invocation saved a `_datadog_{N}` checkpoint, use // the one with the highest N — it reflects the latest trace-context state // of the ongoing durable execution. Same scheme as dd-trace-py. const latestCheckpoint = findLatestTraceContextCheckpoint(event); - const executionRootSpanId = getDurableExecutionRootSpanId(executionArn); if (latestCheckpoint) { logDebug( - `Using trace context from checkpoint _dd_trace_context_${latestCheckpoint.number}`, + `Using trace context from checkpoint ${latestCheckpoint.name}`, ); const traceIdStr = latestCheckpoint.headers["x-datadog-trace-id"]; const parentIdStr = latestCheckpoint.headers["x-datadog-parent-id"]; @@ -488,7 +482,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor rawParentId: effectiveParentId, normalizedTraceId: normalizedTraceId.traceId, normalizedParentId, - executionRootSpanId, ptid: ptidFromTags, }); @@ -501,16 +494,14 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor ptid: ptidFromTags, }); - if (normalizedTraceId.traceId) { + if (normalizedTraceId.traceId && normalizedParentId) { try { const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); const id = require("dd-trace/packages/dd-trace/src/id"); const ddSpanContext = new _DatadogSpanContext({ traceId: id(normalizedTraceId.traceId, 10), - // Use deterministic root span id from executionArn so all invocations - // remain anchored to the same durable root regardless of checkpoint payload format. - spanId: id(executionRootSpanId, 10), + spanId: id(normalizedParentId, 10), sampling: { priority: samplingPriorityStr }, }); @@ -521,9 +512,8 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor durableTraceDebugLog("Activated trace context from checkpoint", { checkpointNumber: latestCheckpoint.number, activatedTraceId: normalizedTraceId.traceId, - activatedParentId: executionRootSpanId, + activatedParentId: normalizedParentId, activatedPtid: ptidFromTags, - checkpointParentId: normalizedParentId, }); return new SpanContextWrapper(ddSpanContext, TraceSource.Event); } catch (e) { @@ -532,16 +522,24 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor checkpointNumber: latestCheckpoint.number, activationError: e instanceof Error ? e.message : String(e), traceId: normalizedTraceId.traceId, - parentId: executionRootSpanId, + parentId: normalizedParentId, ptid: ptidFromTags, }); - // Fall through to existing paths + const fallback = SpanContextWrapper.fromTraceContext({ + traceId: normalizedTraceId.traceId, + parentId: normalizedParentId, + sampleMode: parseInt(samplingPriorityStr, 10), + source: TraceSource.Event, + }); + if (fallback) { + return fallback; + } } } else { durableTraceDebugLog("Checkpoint did not contain usable trace identifiers", { checkpointNumber: latestCheckpoint.number, hasTraceId: Boolean(normalizedTraceId.traceId), - hasParentId: Boolean(normalizedParentId || executionRootSpanId), + hasParentId: Boolean(normalizedParentId), traceparent: latestCheckpoint.headers.traceparent, }); } @@ -557,7 +555,7 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor let traceId: string; let ptid: string; - const rootSpanId = executionRootSpanId; + const rootSpanId = generateRandomPositiveId(); let samplingPriority: string; if (upstream) { @@ -577,8 +575,7 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor logDebug(`Upstream trace_id invalid, generated new trace_id=${traceId}, _dd.p.tid=${ptid}`); } - // For the first invocation (no checkpoint), use the deterministic durable - // root span id derived from executionArn. + // For first invocation, create a new durable root span id and chain aws.lambda to it. durableTraceDebugLog("No checkpoint found; generated durable root context from upstream trace", { traceId, rootSpanId, @@ -588,8 +585,8 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor }); } else { // --- Step 2: No checkpoint and no upstream context --- - // Start a new trace and use deterministic root span id that will be - // referenced by checkpoints in later invocations. + // Start a new trace and create a random durable root span id that + // checkpoints will carry across subsequent invocations. const randomTrace = generateRandomTraceId128(); traceId = randomTrace.traceId; ptid = randomTrace.ptid; @@ -735,7 +732,7 @@ export function createDurableExecutionRootSpan( const operations = event.InitialExecutionState?.Operations; const hasCheckpoint = Boolean( - operations?.some((op) => typeof op?.Name === "string" && op.Name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)), + operations?.some((op) => isTraceCheckpointName(op?.Name)), ); const hasCompletedOperation = Boolean(operations?.some((op) => TERMINAL_STATUSES.has(op.Status))); const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; @@ -750,9 +747,9 @@ export function createDurableExecutionRootSpan( return null; } - const rootSpanId = getDurableExecutionRootSpanId(executionArn); const extractedTraceId = extractedRootContext?.toTraceId(); const extractedSpanId = extractedRootContext?.toSpanId(); + const rootSpanId = extractedSpanId || generateRandomPositiveId(); durableTraceDebugLog("Preparing durable root span creation", { executionArn, extractedTraceId, From ad4a2c4ef78a8a2e01453e10c7c35c831b68dda1 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 21:59:59 -0400 Subject: [PATCH 03/15] remove debugging ENV --- .../context/extractors/durable-execution.ts | 179 +----------------- src/trace/listener.ts | 27 --- 2 files changed, 1 insertion(+), 205 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index c2976b6ee..e2200e344 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -14,24 +14,6 @@ import { SpanContextWrapper } from "../../span-context-wrapper"; import { SampleMode, TraceSource } from "../../trace-context-service"; import { EventTraceExtractor } from "../extractor"; -const DURABLE_TRACE_DEBUG_ENV = "DD_DURABLE_TRACE_DEBUG"; - -function durableTraceDebugEnabled(): boolean { - const value = process.env[DURABLE_TRACE_DEBUG_ENV]; - if (!value) return false; - const normalized = value.toLowerCase(); - return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; -} - -function durableTraceDebugLog(message: string, details?: Record): void { - if (!durableTraceDebugEnabled()) return; - if (details) { - console.log(`[dd-lambda][durable-trace] ${message}`, details); - return; - } - console.log(`[dd-lambda][durable-trace] ${message}`); -} - function parseTraceparentHex( traceparent: unknown, ): { traceIdHex: string; parentIdHex: string; lower64TraceIdDec: string; upper64TraceIdHex: string; parentIdDec: string } | null { @@ -298,13 +280,6 @@ function findLatestTraceContextCheckpoint( try { const parsed = JSON.parse(raw); if (parsed && typeof parsed === "object") { - durableTraceDebugLog("Found latest trace-context checkpoint", { - checkpointName: best.op.Name, - checkpointNumber: best.number, - operationId: best.op.Id, - hasPayload: Boolean(best.op.Payload), - hasStepResult: Boolean(best.op.StepDetails?.Result), - }); return { number: best.number, name: String(best.op.Name), @@ -313,12 +288,6 @@ function findLatestTraceContextCheckpoint( } } catch (e) { logDebug(`Failed to parse trace checkpoint payload: ${e}`); - durableTraceDebugLog("Failed to parse trace-context checkpoint payload", { - checkpointName: best.op.Name, - checkpointNumber: best.number, - operationId: best.op.Id, - parseError: e instanceof Error ? e.message : String(e), - }); } return null; } @@ -416,27 +385,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor return null; } - const operations = event.InitialExecutionState?.Operations; - durableTraceDebugLog("Durable invocation event received", { - executionArn, - checkpointToken: event.CheckpointToken, - operationCount: operations?.length ?? 0, - }); - if (operations?.length) { - const checkpointOperations = operations - .filter((op) => isTraceCheckpointName(op?.Name)) - .map((op) => ({ - id: op.Id, - name: op.Name, - status: op.Status, - hasPayload: Boolean(op.Payload), - hasStepResult: Boolean(op.StepDetails?.Result), - })); - durableTraceDebugLog("Trace-context checkpoint operations present in event", { - checkpoints: checkpointOperations, - }); - } - // --- Step 0: Prefer a previously-saved trace-context checkpoint --- // If a previous invocation saved a `_datadog_{N}` checkpoint, use // the one with the highest N — it reflects the latest trace-context state @@ -460,13 +408,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor effectiveTraceId = effectiveTraceId || parsedTraceparent.lower64TraceIdDec; effectiveParentId = effectiveParentId || parsedTraceparent.parentIdDec; ptidFromTags = ptidFromTags || parsedTraceparent.upper64TraceIdHex; - durableTraceDebugLog("Derived Datadog IDs from traceparent in checkpoint", { - checkpointNumber: latestCheckpoint.number, - traceparent: latestCheckpoint.headers.traceparent, - derivedTraceId: effectiveTraceId, - derivedParentId: effectiveParentId, - derivedPtid: ptidFromTags, - }); } } @@ -476,24 +417,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor ptidFromTags = normalizedTraceId.ptidFromTraceId; } - durableTraceDebugLog("Normalized checkpoint IDs", { - checkpointNumber: latestCheckpoint.number, - rawTraceId: effectiveTraceId, - rawParentId: effectiveParentId, - normalizedTraceId: normalizedTraceId.traceId, - normalizedParentId, - ptid: ptidFromTags, - }); - - durableTraceDebugLog("Checkpoint headers selected for extraction", { - checkpointNumber: latestCheckpoint.number, - headerKeys: Object.keys(latestCheckpoint.headers), - traceId: normalizedTraceId.traceId, - parentId: normalizedParentId, - samplingPriority: samplingPriorityStr, - ptid: ptidFromTags, - }); - if (normalizedTraceId.traceId && normalizedParentId) { try { const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); @@ -508,23 +431,9 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor if (ptidFromTags) { ddSpanContext._trace.tags["_dd.p.tid"] = ptidFromTags; } - - durableTraceDebugLog("Activated trace context from checkpoint", { - checkpointNumber: latestCheckpoint.number, - activatedTraceId: normalizedTraceId.traceId, - activatedParentId: normalizedParentId, - activatedPtid: ptidFromTags, - }); return new SpanContextWrapper(ddSpanContext, TraceSource.Event); } catch (e) { logDebug(`Failed to construct SpanContext from checkpoint: ${e}`); - durableTraceDebugLog("Failed to activate trace context from checkpoint", { - checkpointNumber: latestCheckpoint.number, - activationError: e instanceof Error ? e.message : String(e), - traceId: normalizedTraceId.traceId, - parentId: normalizedParentId, - ptid: ptidFromTags, - }); const fallback = SpanContextWrapper.fromTraceContext({ traceId: normalizedTraceId.traceId, parentId: normalizedParentId, @@ -535,19 +444,7 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor return fallback; } } - } else { - durableTraceDebugLog("Checkpoint did not contain usable trace identifiers", { - checkpointNumber: latestCheckpoint.number, - hasTraceId: Boolean(normalizedTraceId.traceId), - hasParentId: Boolean(normalizedParentId), - traceparent: latestCheckpoint.headers.traceparent, - }); } - } else { - durableTraceDebugLog("No trace-context checkpoint found in event operations", { - executionArn, - operationCount: operations?.length ?? 0, - }); } // --- Step 1: Try to use real upstream trace context --- @@ -575,14 +472,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor logDebug(`Upstream trace_id invalid, generated new trace_id=${traceId}, _dd.p.tid=${ptid}`); } - // For first invocation, create a new durable root span id and chain aws.lambda to it. - durableTraceDebugLog("No checkpoint found; generated durable root context from upstream trace", { - traceId, - rootSpanId, - upstreamParentId: upstream.parentId, - samplingPriority, - ptid, - }); } else { // --- Step 2: No checkpoint and no upstream context --- // Start a new trace and create a random durable root span id that @@ -593,12 +482,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor samplingPriority = SampleMode.AUTO_KEEP.toString(); logDebug(`No upstream context, generated trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); - durableTraceDebugLog("No checkpoint/upstream found; generated new durable trace context", { - executionArn, - traceId, - rootSpanId, - ptid, - }); } logDebug(`Generated initial durable root context: trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); @@ -738,25 +621,10 @@ export function createDurableExecutionRootSpan( const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; if (!isLikelyFirstInvocation) { - durableTraceDebugLog("Skipping durable root span creation for replay invocation", { - executionArn, - operationCount: operations?.length ?? 0, - hasCheckpoint, - hasCompletedOperation, - }); return null; } - const extractedTraceId = extractedRootContext?.toTraceId(); - const extractedSpanId = extractedRootContext?.toSpanId(); - const rootSpanId = extractedSpanId || generateRandomPositiveId(); - durableTraceDebugLog("Preparing durable root span creation", { - executionArn, - extractedTraceId, - extractedSpanId, - expectedRootSpanId: rootSpanId, - operationCount: operations?.length ?? 0, - }); + const rootSpanId = extractedRootContext?.toSpanId() || generateRandomPositiveId(); // Determine consistent start_time from the first operation's StartTimestamp // StartTimestamp is unix milliseconds from the durable execution SDK @@ -799,22 +667,6 @@ export function createDurableExecutionRootSpan( spanOptions.childOf = extractedRootContext.spanContext; } - let activeBefore: any; - try { - activeBefore = tracer?.scope?.().active?.(); - } catch { - activeBefore = undefined; - } - durableTraceDebugLog("Creating durable root span", { - executionArn, - startTime, - expectedRootSpanId: rootSpanId, - activeScopeBefore: activeBefore ? { - traceId: activeBefore.context?.()?.toTraceId?.(), - spanId: activeBefore.context?.()?.toSpanId?.(), - } : null, - }); - const span = tracer.startSpan("aws.durable-execution", spanOptions); // Re-emit root span using the extracted root span_id so all invocations @@ -842,40 +694,11 @@ export function createDurableExecutionRootSpan( logDebug(`Failed to set root span parent_id: ${e}`); } - const createdTraceId = span.context().toTraceId?.(); - const createdSpanId = span.context().toSpanId?.(); - const createdParentId = span.context()._parentId?.toString?.(10) ?? span.context()._parentId?.toString?.(); - durableTraceDebugLog("Durable root span created", { - executionArn, - createdTraceId, - createdSpanId, - createdParentId, - expectedRootSpanId: rootSpanId, - extractedTraceId, - extractedSpanId, - traceMatchesExtracted: extractedTraceId ? createdTraceId === extractedTraceId : undefined, - spanMatchesExpectedRoot: createdSpanId === rootSpanId, - }); - if (extractedTraceId && createdTraceId && extractedTraceId !== createdTraceId) { - durableTraceDebugLog("WARNING: durable root span trace differs from extracted durable context", { - executionArn, - extractedTraceId, - createdTraceId, - note: "This can cause the durable root span to appear standalone.", - }); - } - logDebug(`Created root execution span: span_id=${rootSpanId ?? "auto"}, start_time=${startTime}`); return { span, finish: () => { - durableTraceDebugLog("Finishing durable root span", { - executionArn, - traceId: span.context().toTraceId?.(), - spanId: span.context().toSpanId?.(), - parentId: span.context()._parentId?.toString?.(10) ?? span.context()._parentId?.toString?.(), - }); span.finish(); logDebug("Finished root execution span"); }, diff --git a/src/trace/listener.ts b/src/trace/listener.ts index 628f8ec42..4b1aa1f95 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -117,12 +117,6 @@ export class TraceListener { } public async onStartInvocation(event: any, context: Context) { - const durableTraceDebugEnabled = (() => { - const value = process.env.DD_DURABLE_TRACE_DEBUG; - if (!value) return false; - const normalized = value.toLowerCase(); - return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; - })(); const tracerInitialized = this.tracerWrapper.isTracerAvailable; if (this.config.injectLogContext) { patchConsole(console, this.contextService); @@ -154,28 +148,7 @@ export class TraceListener { // Create the durable execution root span before everything else. // This span uses the propagated root span_id and is re-emitted on every // invocation (last one wins in the backend with correct total duration). - if (durableTraceDebugEnabled && spanContextWrapper) { - console.log("[dd-lambda][durable-trace] Listener root-span inputs", { - traceSource: this.contextService.traceSource, - extracted: { - traceId: spanContextWrapper.toTraceId(), - parentId: spanContextWrapper.toSpanId(), - sampleMode: spanContextWrapper.sampleMode(), - }, - }); - } this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; - if (durableTraceDebugEnabled) { - const durableRootSpanContext = this.durableRootSpan?.span?.context?.(); - console.log("[dd-lambda][durable-trace] Listener root-span creation result", { - created: Boolean(this.durableRootSpan), - rootSpan: durableRootSpanContext ? { - traceId: durableRootSpanContext.toTraceId?.(), - spanId: durableRootSpanContext.toSpanId?.(), - parentId: durableRootSpanContext._parentId?.toString?.(10) ?? durableRootSpanContext._parentId?.toString?.(), - } : null, - }); - } if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( From e149fa639498c5f057c86e00fdd1e98eec842772 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:05:15 -0400 Subject: [PATCH 04/15] update comments --- src/trace/context/extractors/durable-execution.ts | 13 +++++++------ src/trace/listener.ts | 7 ++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index e2200e344..aa3dbf066 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -591,11 +591,12 @@ export function getCompletedOperationCount(event: unknown): number { } /** - * Create (or re-emit) the durable execution root span. + * Create the durable execution root span for likely first invocations only. * - * Every invocation emits this span with the same propagated root span_id. - * The Datadog backend deduplicates by span_id, so the last invocation's - * version wins with the correct total duration (start → final end). + * Replay invocations return null. The current first-invocation heuristic is: + * - no trace checkpoint operation exists + * - no operation has terminal status + * - operation count is <= 1 * * Returns an object with { span, finish() } or null if not a durable execution. * Caller must call finish() when the invocation ends. @@ -669,8 +670,8 @@ export function createDurableExecutionRootSpan( const span = tracer.startSpan("aws.durable-execution", spanOptions); - // Re-emit root span using the extracted root span_id so all invocations - // refer to the same durable execution root (propagated by checkpoints). + // Use the extracted durable root span_id when available to keep the + // durable root identity stable with propagated checkpoint context. try { if (rootSpanId) { span.context()._spanId = id(rootSpanId, 10); diff --git a/src/trace/listener.ts b/src/trace/listener.ts index 4b1aa1f95..d3e8d8136 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -145,9 +145,10 @@ export class TraceListener { traceSource: this.contextService.traceSource, }); } - // Create the durable execution root span before everything else. - // This span uses the propagated root span_id and is re-emitted on every - // invocation (last one wins in the backend with correct total duration). + // Create the durable execution root span before everything else so later + // spans can parent correctly. Root creation is gated in + // createDurableExecutionRootSpan() and only happens for likely first + // invocations; replay invocations return null. this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; if (this.config.createInferredSpan) { From 4f0e77fdd15062b72e0760d698f6bb5b78bc9b61 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:14:55 -0400 Subject: [PATCH 05/15] refactor away the AI's complex extractor --- src/trace/context/extractor.ts | 2 +- .../extractors/durable-execution.spec.ts | 51 ++- .../context/extractors/durable-execution.ts | 364 +++--------------- src/trace/listener.ts | 1 - 4 files changed, 87 insertions(+), 331 deletions(-) diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 34afff0f6..cc03521bc 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -87,7 +87,7 @@ export class TraceContextExtractor { if (!event || typeof event !== "object") return; // Check for durable execution event first (has DurableExecutionArn + CheckpointToken) - if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(); + if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(this.tracerWrapper); const headers = event.headers ?? event.multiValueHeaders; if (headers !== null && typeof headers === "object") { diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index 864a35e9c..b2353795b 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -1,9 +1,14 @@ import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; +import { TracerWrapper } from "../../tracer-wrapper"; jest.mock("dd-trace", () => ({ startSpan: jest.fn(), })); +function makeTracerWrapper(extractReturn: any = null): TracerWrapper { + return { extract: jest.fn().mockReturnValue(extractReturn) } as unknown as TracerWrapper; +} + describe("DurableExecutionEventTraceExtractor", () => { const tracer = require("dd-trace"); const startSpanMock = tracer.startSpan as jest.Mock; @@ -12,10 +17,16 @@ describe("DurableExecutionEventTraceExtractor", () => { jest.clearAllMocks(); }); - it("extracts trace context from the latest trace checkpoint", () => { + it("delegates checkpoint headers to the standard propagator", () => { const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; + const checkpointHeaders = { + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "987654321012345678", + "x-datadog-sampling-priority": "1", + }; + const event = { DurableExecutionArn: executionArn, CheckpointToken: "t-1", @@ -26,23 +37,34 @@ describe("DurableExecutionEventTraceExtractor", () => { Name: "_datadog_0", Status: "SUCCEEDED", StepDetails: { - Result: JSON.stringify({ - "x-datadog-trace-id": "149750110124521191", - "x-datadog-parent-id": "987654321012345678", - "x-datadog-sampling-priority": "1", - }), + Result: JSON.stringify(checkpointHeaders), }, }, ], }, }; - const extractor = new DurableExecutionEventTraceExtractor(); + const sentinelContext = { sentinel: true }; + const tracerWrapper = makeTracerWrapper(sentinelContext); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); const context = extractor.extract(event); - expect(context).not.toBeNull(); - expect(context?.toTraceId()).toBe("149750110124521191"); - expect(context?.toSpanId()).toBe("987654321012345678"); + expect(tracerWrapper.extract).toHaveBeenCalledWith(checkpointHeaders); + expect(context).toBe(sentinelContext); + }); + + it("returns null when no checkpoint or upstream context exists", () => { + const tracerWrapper = makeTracerWrapper(); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + + const context = extractor.extract({ + DurableExecutionArn: "arn:aws:lambda:us-east-2:123:function:demo", + CheckpointToken: "t-empty", + InitialExecutionState: { Operations: [] }, + }); + + expect(context).toBeNull(); + expect(tracerWrapper.extract).not.toHaveBeenCalled(); }); it("creates durable root span only for first invocation", () => { @@ -79,14 +101,10 @@ describe("DurableExecutionEventTraceExtractor", () => { }, }; - const extractor = new DurableExecutionEventTraceExtractor(); - const extracted = extractor.extract(firstInvocationEvent); - - const root = createDurableExecutionRootSpan(firstInvocationEvent, extracted); + const root = createDurableExecutionRootSpan(firstInvocationEvent, null); expect(root).not.toBeNull(); expect(startSpanMock).toHaveBeenCalledTimes(1); - expect(root?.span.context()._spanId.toString(10)).toBe(extracted?.toSpanId()); }); it("skips durable root span creation on replay invocations", () => { @@ -119,7 +137,8 @@ describe("DurableExecutionEventTraceExtractor", () => { }, }; - const extractor = new DurableExecutionEventTraceExtractor(); + const tracerWrapper = makeTracerWrapper({ source: "Event" }); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); const extracted = extractor.extract(replayEvent); const root = createDurableExecutionRootSpan(replayEvent, extracted); diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index aa3dbf066..9865eb0df 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -5,107 +5,19 @@ * 1. Prefer trace context from the latest `_datadog_{N}` checkpoint. * 2. If no trace checkpoint exists (first invocation), try upstream trace context * from the original customer event stored in `Operations[0].ExecutionDetails.InputPayload`. - * 3. If neither exists, start a new trace with random IDs. + * 3. If neither exists, return null and let the default extraction path create the context. + * + * The dd-trace-js durable-execution plugin writes checkpoint headers via the + * standard HTTP propagator (`tracer.inject(span, 'http_headers', headers)`), + * so we just hand the resulting header dict back to `tracer.extract` here. */ import { randomBytes } from "crypto"; import { logDebug } from "../../../utils"; import { SpanContextWrapper } from "../../span-context-wrapper"; -import { SampleMode, TraceSource } from "../../trace-context-service"; +import { TracerWrapper } from "../../tracer-wrapper"; import { EventTraceExtractor } from "../extractor"; -function parseTraceparentHex( - traceparent: unknown, -): { traceIdHex: string; parentIdHex: string; lower64TraceIdDec: string; upper64TraceIdHex: string; parentIdDec: string } | null { - if (typeof traceparent !== "string") return null; - const parts = traceparent.split("-"); - if (parts.length !== 4) return null; - const [, traceIdHex, parentIdHex] = parts; - if (!/^[0-9a-f]{32}$/i.test(traceIdHex) || !/^[0-9a-f]{16}$/i.test(parentIdHex)) { - return null; - } - - const lower64TraceIdHex = traceIdHex.slice(16); - const upper64TraceIdHex = traceIdHex.slice(0, 16); - - try { - return { - traceIdHex, - parentIdHex, - lower64TraceIdDec: BigInt(`0x${lower64TraceIdHex}`).toString(10), - upper64TraceIdHex, - parentIdDec: BigInt(`0x${parentIdHex}`).toString(10), - }; - } catch { - return null; - } -} - -function normalizeParentIdToDecimal(parentId: unknown): string | null { - if (typeof parentId !== "string") return null; - const value = parentId.trim(); - if (!value) return null; - - if (/^[0-9]+$/.test(value)) { - return value; - } - - if (/^[0-9a-f]+$/i.test(value)) { - const hex = value.length > 16 ? value.slice(-16) : value; - try { - return BigInt(`0x${hex}`).toString(10); - } catch { - return null; - } - } - - return null; -} - -function normalizeTraceIdToDecimal( - traceId: unknown, -): { traceId: string | null; ptidFromTraceId?: string } { - if (typeof traceId !== "string") { - return { traceId: null }; - } - - const value = traceId.trim(); - if (!value) { - return { traceId: null }; - } - - if (/^[0-9]+$/.test(value)) { - return { traceId: value }; - } - - if (/^[0-9a-f]+$/i.test(value)) { - // If a 128-bit hex trace ID was accidentally put here, split it like traceparent: - // lower 64 bits for Datadog trace_id, upper 64 bits for _dd.p.tid. - if (value.length > 16) { - const upperHex = value.slice(-32, -16).padStart(16, "0"); - const lowerHex = value.slice(-16); - try { - return { - traceId: BigInt(`0x${lowerHex}`).toString(10), - ptidFromTraceId: upperHex.toLowerCase(), - }; - } catch { - return { traceId: null }; - } - } - - try { - return { - traceId: BigInt(`0x${value}`).toString(10), - }; - } catch { - return { traceId: null }; - } - } - - return { traceId: null }; -} - /** * Interface for operation data in durable execution state */ @@ -202,23 +114,6 @@ function generateRandomPositiveId(): string { return value === 0n ? "1" : value.toString(10); } -function generateRandomTraceId128(): { traceId: string; ptid: string } { - const bytes = randomBytes(16); - - // Upper 64 bits -> _dd.p.tid - const upperBytes = Buffer.from(bytes.subarray(0, 8)); - const upperValue = bufferToBigInt(upperBytes); - const ptid = (upperValue === 0n ? 1n : upperValue).toString(16).padStart(16, "0"); - - // Lower 64 bits -> Datadog trace_id (decimal) - const lowerBytes = Buffer.from(bytes.subarray(8, 16)); - lowerBytes[0] = lowerBytes[0] & 0x7f; // keep positive int64 - const lowerValue = bufferToBigInt(lowerBytes); - const traceId = lowerValue === 0n ? "1" : lowerValue.toString(10); - - return { traceId, ptid }; -} - function bufferToBigInt(buf: Buffer): bigint { let result = 0n; for (let i = 0; i < buf.length; i++) { @@ -254,14 +149,17 @@ function isTraceCheckpointName(name: unknown): boolean { } /** - * Find the highest-numbered `_datadog_{N}` checkpoint in the event. - * Also supports legacy `_dd_trace_context_{N}` checkpoints for compatibility. - * Each invocation that changes trace context saves a new checkpoint with - * N+1; the one with the highest N is the most recent. + * Find the highest-numbered `_datadog_{N}` checkpoint in the event and return + * its parsed header dict. + * + * Each invocation that changes trace context saves a new checkpoint with N+1; + * the one with the highest N is the most recent. Headers are written by the + * dd-trace-js plugin via `tracer.inject(span, 'http_headers', headers)` so the + * payload is a standard HTTP-style header dict. + * + * Also accepts legacy `_dd_trace_context_{N}` names for compatibility. */ -function findLatestTraceContextCheckpoint( - event: DurableExecutionEvent, -): { number: number; name: string; headers: Record } | null { +function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record | null { const operations = event.InitialExecutionState?.Operations; if (!operations || operations.length === 0) return null; @@ -280,11 +178,7 @@ function findLatestTraceContextCheckpoint( try { const parsed = JSON.parse(raw); if (parsed && typeof parsed === "object") { - return { - number: best.number, - name: String(best.op.Name), - headers: parsed as Record, - }; + return parsed as Record; } } catch (e) { logDebug(`Failed to parse trace checkpoint payload: ${e}`); @@ -293,230 +187,73 @@ function findLatestTraceContextCheckpoint( } /** - * Try to extract a real Datadog trace context from the original customer event - * stored inside the durable execution envelope. - * - * The original event is stored in Operations[0].ExecutionDetails.InputPayload. - * Since all invocations replay the same stored event, any trace headers injected - * by an upstream Datadog-traced service will be present on every invocation. - * - * Returns extracted context info or null. + * Find upstream HTTP headers carried by the original customer event stored in + * `Operations[0].ExecutionDetails.InputPayload`. Returns the standard header + * dict (keys like `x-datadog-trace-id`, `traceparent`, etc.) or null. */ -function extractUpstreamTraceContext( - event: DurableExecutionEvent, -): { traceId: string; parentId: string; samplingPriority: string; ptid: string } | null { +function findUpstreamHeaders(event: DurableExecutionEvent): Record | null { try { const operations = event.InitialExecutionState?.Operations; if (!operations || operations.length === 0) return null; - const firstOp = operations[0]; - const inputPayloadStr = firstOp.ExecutionDetails?.InputPayload; + const inputPayloadStr = operations[0].ExecutionDetails?.InputPayload; if (!inputPayloadStr) return null; const customerEvent = JSON.parse(inputPayloadStr); if (!customerEvent || typeof customerEvent !== "object") return null; - // Try headers (API Gateway, ALB, Function URL) const headers = customerEvent.headers; if (headers && typeof headers === "object") { - const traceId = headers["x-datadog-trace-id"]; - const parentId = headers["x-datadog-parent-id"]; - if (traceId && parentId) { - const samplingPriority = headers["x-datadog-sampling-priority"] || "1"; - const tags = headers["x-datadog-tags"] || ""; - const ptid = parsePtid(tags); - logDebug(`Found upstream trace context in customer event headers`); - return { traceId, parentId, samplingPriority, ptid }; - } + return headers as Record; } - // Try _datadog field (direct invocation / Step Functions) const ddData = customerEvent._datadog; if (ddData && typeof ddData === "object") { - const traceId = ddData["x-datadog-trace-id"]; - const parentId = ddData["x-datadog-parent-id"]; - if (traceId && parentId) { - const samplingPriority = ddData["x-datadog-sampling-priority"] || "1"; - const tags = ddData["x-datadog-tags"] || ""; - const ptid = parsePtid(tags); - logDebug(`Found upstream trace context in customer event _datadog field`); - return { traceId, parentId, samplingPriority, ptid }; - } + return ddData as Record; } } catch (e) { - logDebug(`Failed to extract upstream trace context from durable event: ${e}`); + logDebug(`Failed to read upstream headers from durable input payload: ${e}`); } return null; } -/** - * Parse _dd.p.tid from x-datadog-tags string. - * Format: "_dd.p.tid=66bcb5eb00000000,_dd.p.dm=-0" - */ -function parsePtid(tags: string): string { - if (!tags) return ""; - for (const tag of tags.split(",")) { - if (tag.includes("_dd.p.tid=")) { - return tag.split("=")[1] || ""; - } - } - return ""; -} - /** * Durable Execution Trace Extractor * - * Strategy: - * 1. Prefer `_datadog_{N}` checkpoint context when present. - * 2. Otherwise, derive trace linkage from upstream customer event context. - * 3. If none exists, start a new random trace context. + * Locates trace headers carried inside the durable execution envelope and hands + * them to the standard dd-trace propagator via `TracerWrapper.extract`. Order: + * 1. Latest `_datadog_{N}` checkpoint payload. + * 2. Upstream customer event headers from `InputPayload`. + * 3. Otherwise return null and let the default extraction path take over. */ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { + constructor(private tracerWrapper: TracerWrapper) {} + extract(event: unknown): SpanContextWrapper | null { if (!isDurableExecutionEvent(event)) { logDebug("Event is not a durable execution event"); return null; } - - const executionArn = event.DurableExecutionArn; - if (!executionArn) { + if (!event.DurableExecutionArn) { logDebug("No DurableExecutionArn in event"); return null; } - // --- Step 0: Prefer a previously-saved trace-context checkpoint --- - // If a previous invocation saved a `_datadog_{N}` checkpoint, use - // the one with the highest N — it reflects the latest trace-context state - // of the ongoing durable execution. Same scheme as dd-trace-py. - const latestCheckpoint = findLatestTraceContextCheckpoint(event); - if (latestCheckpoint) { - logDebug( - `Using trace context from checkpoint ${latestCheckpoint.name}`, - ); - const traceIdStr = latestCheckpoint.headers["x-datadog-trace-id"]; - const parentIdStr = latestCheckpoint.headers["x-datadog-parent-id"]; - const samplingPriorityStr = latestCheckpoint.headers["x-datadog-sampling-priority"] || "1"; - const tagsStr = latestCheckpoint.headers["x-datadog-tags"] || ""; - let ptidFromTags = parsePtid(tagsStr); - let effectiveTraceId = traceIdStr; - let effectiveParentId = parentIdStr; - - if ((!effectiveTraceId || !effectiveParentId) && latestCheckpoint.headers.traceparent) { - const parsedTraceparent = parseTraceparentHex(latestCheckpoint.headers.traceparent); - if (parsedTraceparent) { - effectiveTraceId = effectiveTraceId || parsedTraceparent.lower64TraceIdDec; - effectiveParentId = effectiveParentId || parsedTraceparent.parentIdDec; - ptidFromTags = ptidFromTags || parsedTraceparent.upper64TraceIdHex; - } - } - - const normalizedTraceId = normalizeTraceIdToDecimal(effectiveTraceId); - const normalizedParentId = normalizeParentIdToDecimal(effectiveParentId); - if (!ptidFromTags && normalizedTraceId.ptidFromTraceId) { - ptidFromTags = normalizedTraceId.ptidFromTraceId; - } - - if (normalizedTraceId.traceId && normalizedParentId) { - try { - const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); - const id = require("dd-trace/packages/dd-trace/src/id"); - - const ddSpanContext = new _DatadogSpanContext({ - traceId: id(normalizedTraceId.traceId, 10), - spanId: id(normalizedParentId, 10), - sampling: { priority: samplingPriorityStr }, - }); - - if (ptidFromTags) { - ddSpanContext._trace.tags["_dd.p.tid"] = ptidFromTags; - } - return new SpanContextWrapper(ddSpanContext, TraceSource.Event); - } catch (e) { - logDebug(`Failed to construct SpanContext from checkpoint: ${e}`); - const fallback = SpanContextWrapper.fromTraceContext({ - traceId: normalizedTraceId.traceId, - parentId: normalizedParentId, - sampleMode: parseInt(samplingPriorityStr, 10), - source: TraceSource.Event, - }); - if (fallback) { - return fallback; - } - } - } + const checkpointHeaders = findLatestCheckpointHeaders(event); + if (checkpointHeaders) { + logDebug("Extracting trace context from durable checkpoint"); + return this.tracerWrapper.extract(checkpointHeaders); } - // --- Step 1: Try to use real upstream trace context --- - const upstream = extractUpstreamTraceContext(event); - - let traceId: string; - let ptid: string; - const rootSpanId = generateRandomPositiveId(); - let samplingPriority: string; - - if (upstream) { - const normalizedUpstreamTrace = normalizeTraceIdToDecimal(upstream.traceId); - const normalizedTraceId = normalizedUpstreamTrace.traceId; - - if (normalizedTraceId) { - traceId = normalizedTraceId; - ptid = upstream.ptid || normalizedUpstreamTrace.ptidFromTraceId || ""; - samplingPriority = upstream.samplingPriority; - logDebug(`Using upstream trace_id=${traceId}, _dd.p.tid=${ptid}`); - } else { - const randomTrace = generateRandomTraceId128(); - traceId = randomTrace.traceId; - ptid = randomTrace.ptid; - samplingPriority = SampleMode.AUTO_KEEP.toString(); - logDebug(`Upstream trace_id invalid, generated new trace_id=${traceId}, _dd.p.tid=${ptid}`); - } - - } else { - // --- Step 2: No checkpoint and no upstream context --- - // Start a new trace and create a random durable root span id that - // checkpoints will carry across subsequent invocations. - const randomTrace = generateRandomTraceId128(); - traceId = randomTrace.traceId; - ptid = randomTrace.ptid; - samplingPriority = SampleMode.AUTO_KEEP.toString(); - - logDebug(`No upstream context, generated trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); + const upstreamHeaders = findUpstreamHeaders(event); + if (upstreamHeaders) { + logDebug("Extracting trace context from upstream durable input payload"); + return this.tracerWrapper.extract(upstreamHeaders); } - logDebug(`Generated initial durable root context: trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); - - // Construct span context with _dd.p.tid for 128-bit W3C trace ID support - // Similar to Step Functions' approach in step-function-service.ts - try { - const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); - const id = require("dd-trace/packages/dd-trace/src/id"); - - const ddSpanContext = new _DatadogSpanContext({ - traceId: id(traceId, 10), - spanId: id(rootSpanId, 10), - sampling: { priority: samplingPriority }, - }); - - // Set _dd.p.tid for upper 64 bits of 128-bit trace ID - if (ptid) { - ddSpanContext._trace.tags["_dd.p.tid"] = ptid; - } - - return new SpanContextWrapper(ddSpanContext, TraceSource.Event); - } catch (error) { - if (error instanceof Error) { - logDebug("Couldn't generate SpanContext with tracer, falling back.", error); - } - } - - // Fallback without _dd.p.tid if dd-trace is not available - return SpanContextWrapper.fromTraceContext({ - traceId, - parentId: rootSpanId, - sampleMode: parseInt(samplingPriority, 10), - source: TraceSource.Event, - }); + logDebug("No durable trace context found; deferring to default extraction"); + return null; } } @@ -680,14 +417,15 @@ export function createDurableExecutionRootSpan( logDebug(`Failed to set durable root span_id: ${e}`); } - // Fix parent_id: the active context has span_id=root_span_id (set by - // DurableExecutionEventTraceExtractor.extract), so tracer.startSpan() - // inherits that as parent_id, causing self-parenting. The root span's - // parent should be the upstream caller (if extracted) or 0 (true root). + // Fix parent_id: when an extracted span context exists, tracer.startSpan() + // inherits its span_id as parent_id and we just overwrote our own span_id + // to match — that would self-parent. The root span's parent should be the + // upstream caller (if any) or 0 (true root). try { - const upstream = extractUpstreamTraceContext(event as DurableExecutionEvent); - if (upstream) { - span.context()._parentId = id(upstream.parentId, 10); + const upstreamHeaders = findUpstreamHeaders(event as DurableExecutionEvent); + const upstreamParentId = upstreamHeaders?.["x-datadog-parent-id"]; + if (upstreamParentId) { + span.context()._parentId = id(String(upstreamParentId), 10); } else { span.context()._parentId = id("0", 10); } diff --git a/src/trace/listener.ts b/src/trace/listener.ts index d3e8d8136..07ef4faed 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -318,7 +318,6 @@ export class TraceListener { } // Finish the durable execution root span after all other spans. - // Re-emitted every invocation; the last one wins in the backend. if (this.durableRootSpan) { this.durableRootSpan.finish(); this.durableRootSpan = undefined; From 0d5ab4ade5d259e1dce8b7d1b56c0a66b8f9c91e Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:32:27 -0400 Subject: [PATCH 06/15] clean up AI complicated stuff... --- .../context/extractors/durable-execution.ts | 64 +++---------------- src/trace/listener.ts | 9 ++- 2 files changed, 15 insertions(+), 58 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index 9865eb0df..39ac2b23c 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -12,7 +12,6 @@ * so we just hand the resulting header dict back to `tracer.extract` here. */ -import { randomBytes } from "crypto"; import { logDebug } from "../../../utils"; import { SpanContextWrapper } from "../../span-context-wrapper"; import { TracerWrapper } from "../../tracer-wrapper"; @@ -107,21 +106,6 @@ export function getCheckpointToken(event: unknown): string | undefined { return event.CheckpointToken; } -function generateRandomPositiveId(): string { - const bytes = randomBytes(8); - bytes[0] = bytes[0] & 0x7f; // keep it positive int64 - const value = bufferToBigInt(bytes); - return value === 0n ? "1" : value.toString(10); -} - -function bufferToBigInt(buf: Buffer): bigint { - let result = 0n; - for (let i = 0; i < buf.length; i++) { - result = (result << 8n) | BigInt(buf[i]); - } - return result; -} - // Terminal operation statuses that indicate an operation has completed const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); @@ -362,25 +346,21 @@ export function createDurableExecutionRootSpan( return null; } - const rootSpanId = extractedRootContext?.toSpanId() || generateRandomPositiveId(); - - // Determine consistent start_time from the first operation's StartTimestamp - // StartTimestamp is unix milliseconds from the durable execution SDK + // Use the first operation's StartTimestamp (unix milliseconds) so the root + // span's start time matches the actual start of the durable execution. let startTime: number | undefined; - const replayOperations = event.InitialExecutionState?.Operations; - if (replayOperations && replayOperations.length > 0) { - const firstStartTs = replayOperations[0].StartTimestamp; + if (operations && operations.length > 0) { + const firstStartTs = operations[0].StartTimestamp; if (firstStartTs != null) { const parsed = Number(firstStartTs); if (!isNaN(parsed)) { - startTime = parsed; // already in millis, dd-trace startSpan expects millis + startTime = parsed; } } } try { const tracer = require("dd-trace"); - const id = require("dd-trace/packages/dd-trace/src/id"); const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; const resourceName = executionArn.includes(":") ? executionArn.split(":").pop() : executionArn; @@ -392,7 +372,7 @@ export function createDurableExecutionRootSpan( "resource.name": resourceName, "durable.execution_arn": executionArn, "durable.is_root_span": true, - "durable.invocation_count": replayOperations?.length ?? 0, + "durable.invocation_count": operations?.length ?? 0, }, }; @@ -400,40 +380,14 @@ export function createDurableExecutionRootSpan( spanOptions.startTime = startTime; } if (extractedRootContext?.spanContext) { - // Ensure the durable root span stays in the same trace as the extracted - // durable invocation context even when there is no active scope. + // Stay in the same trace as the upstream caller even when there is no + // active scope yet. aws.lambda will be parented to this span downstream. spanOptions.childOf = extractedRootContext.spanContext; } const span = tracer.startSpan("aws.durable-execution", spanOptions); - // Use the extracted durable root span_id when available to keep the - // durable root identity stable with propagated checkpoint context. - try { - if (rootSpanId) { - span.context()._spanId = id(rootSpanId, 10); - } - } catch (e) { - logDebug(`Failed to set durable root span_id: ${e}`); - } - - // Fix parent_id: when an extracted span context exists, tracer.startSpan() - // inherits its span_id as parent_id and we just overwrote our own span_id - // to match — that would self-parent. The root span's parent should be the - // upstream caller (if any) or 0 (true root). - try { - const upstreamHeaders = findUpstreamHeaders(event as DurableExecutionEvent); - const upstreamParentId = upstreamHeaders?.["x-datadog-parent-id"]; - if (upstreamParentId) { - span.context()._parentId = id(String(upstreamParentId), 10); - } else { - span.context()._parentId = id("0", 10); - } - } catch (e) { - logDebug(`Failed to set root span parent_id: ${e}`); - } - - logDebug(`Created root execution span: span_id=${rootSpanId ?? "auto"}, start_time=${startTime}`); + logDebug(`Created root execution span: start_time=${startTime}`); return { span, diff --git a/src/trace/listener.ts b/src/trace/listener.ts index 07ef4faed..2c526532e 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -148,19 +148,22 @@ export class TraceListener { // Create the durable execution root span before everything else so later // spans can parent correctly. Root creation is gated in // createDurableExecutionRootSpan() and only happens for likely first - // invocations; replay invocations return null. + // invocations; replay invocations return null. The root span inherits + // trace_id and sampling from spanContextWrapper, so anything parented to + // it (aws.lambda below) joins the same trace automatically. this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; + const durableRootSpanContext = this.durableRootSpan?.span?.context(); if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( event, context, - parentSpanContext, + durableRootSpanContext || parentSpanContext, this.config.encodeAuthorizerContext, ); } - this.lambdaSpanParentContext = this.inferredSpan?.span || parentSpanContext; + this.lambdaSpanParentContext = this.inferredSpan?.span || durableRootSpanContext || parentSpanContext; this.context = context; const eventSource = parseEventSource(event); this.triggerTags = extractTriggerTags(event, context, eventSource); From c625b2412be414edde2b22e386c99abb8577f935 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 1 May 2026 14:00:50 -0400 Subject: [PATCH 07/15] rearrange and enhancements --- .../extractors/durable-execution.spec.ts | 12 +++++----- .../context/extractors/durable-execution.ts | 23 +++++++------------ src/trace/listener.ts | 21 ++--------------- 3 files changed, 16 insertions(+), 40 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index b2353795b..fd8e46fc2 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -116,12 +116,12 @@ describe("DurableExecutionEventTraceExtractor", () => { CheckpointToken: "t-replay", InitialExecutionState: { Operations: [ - { - Id: "op-1", - Name: "_dd_trace_context_0", - Status: "SUCCEEDED", - StepDetails: { - Result: JSON.stringify({ + { + Id: "op-1", + Name: "_datadog_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ "x-datadog-trace-id": "149750110124521191", "x-datadog-parent-id": "538591322263933970", "x-datadog-sampling-priority": "1", diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index 39ac2b23c..5fb6c2a55 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -110,19 +110,12 @@ export function getCheckpointToken(event: unknown): string | undefined { const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); const TRACE_CHECKPOINT_NAME_PREFIX = "_datadog_"; -const LEGACY_TRACE_CHECKPOINT_NAME_PREFIX = "_dd_trace_context_"; -const TRACE_CHECKPOINT_NAME_PREFIXES = [ - TRACE_CHECKPOINT_NAME_PREFIX, - LEGACY_TRACE_CHECKPOINT_NAME_PREFIX, -]; function parseTraceCheckpointNumber(name: unknown): number | null { if (typeof name !== "string") return null; - const prefix = TRACE_CHECKPOINT_NAME_PREFIXES.find((candidate) => name.startsWith(candidate)); - if (!prefix) return null; - - const suffix = name.slice(prefix.length); + if (!name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) return null; + const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); const n = Number.parseInt(suffix, 10); if (Number.isNaN(n) || String(n) !== suffix) return null; return n; @@ -141,7 +134,6 @@ function isTraceCheckpointName(name: unknown): boolean { * dd-trace-js plugin via `tracer.inject(span, 'http_headers', headers)` so the * payload is a standard HTTP-style header dict. * - * Also accepts legacy `_dd_trace_context_{N}` names for compatibility. */ function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record | null { const operations = event.InitialExecutionState?.Operations; @@ -319,12 +311,14 @@ export function getCompletedOperationCount(event: unknown): number { * - no operation has terminal status * - operation count is <= 1 * + * The created span is parented to the current aws.lambda span context. + * * Returns an object with { span, finish() } or null if not a durable execution. * Caller must call finish() when the invocation ends. */ export function createDurableExecutionRootSpan( event: unknown, - extractedRootContext?: SpanContextWrapper | null, + parentSpanContext?: unknown, ): { span: any; finish: () => void } | null { if (!isDurableExecutionEvent(event)) { return null; @@ -379,10 +373,9 @@ export function createDurableExecutionRootSpan( if (startTime !== undefined) { spanOptions.startTime = startTime; } - if (extractedRootContext?.spanContext) { - // Stay in the same trace as the upstream caller even when there is no - // active scope yet. aws.lambda will be parented to this span downstream. - spanOptions.childOf = extractedRootContext.spanContext; + if (parentSpanContext) { + // Root span is modeled as a child of aws.lambda. + spanOptions.childOf = parentSpanContext; } const span = tracer.startSpan("aws.durable-execution", spanOptions); diff --git a/src/trace/listener.ts b/src/trace/listener.ts index 2c526532e..f068c5352 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -27,7 +27,6 @@ import { } from "./durable-function-context"; import { XrayService } from "./xray-service"; import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http"; -import { createDurableExecutionRootSpan } from "./context/extractors/durable-execution"; import { getSpanPointerAttributes, SpanPointerAttributes } from "../utils/span-pointers"; export type TraceExtractor = (event: any, context: Context) => Promise | TraceContext; @@ -103,7 +102,6 @@ export class TraceListener { private wrappedCurrentSpan?: SpanWrapper; private triggerTags?: { [key: string]: string }; private lambdaSpanParentContext?: SpanContext; - private durableRootSpan?: { span: any; finish: () => void }; private spanPointerAttributesList: SpanPointerAttributes[] | undefined; public get currentTraceHeaders() { @@ -145,25 +143,16 @@ export class TraceListener { traceSource: this.contextService.traceSource, }); } - // Create the durable execution root span before everything else so later - // spans can parent correctly. Root creation is gated in - // createDurableExecutionRootSpan() and only happens for likely first - // invocations; replay invocations return null. The root span inherits - // trace_id and sampling from spanContextWrapper, so anything parented to - // it (aws.lambda below) joins the same trace automatically. - this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; - const durableRootSpanContext = this.durableRootSpan?.span?.context(); - if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( event, context, - durableRootSpanContext || parentSpanContext, + parentSpanContext, this.config.encodeAuthorizerContext, ); } - this.lambdaSpanParentContext = this.inferredSpan?.span || durableRootSpanContext || parentSpanContext; + this.lambdaSpanParentContext = this.inferredSpan?.span || parentSpanContext; this.context = context; const eventSource = parseEventSource(event); this.triggerTags = extractTriggerTags(event, context, eventSource); @@ -320,12 +309,6 @@ export class TraceListener { this.injectAuthorizerSpan(result, event?.requestContext?.requestId, finishTime || Date.now()); } - // Finish the durable execution root span after all other spans. - if (this.durableRootSpan) { - this.durableRootSpan.finish(); - this.durableRootSpan = undefined; - } - // Reset singletons and trace context this.stepFunctionContext = undefined; this.durableFunctionContext = undefined; From 4401d6aa7693572c7ddf1eec98fd3e3e367c6d9c Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 7 May 2026 23:06:32 -0400 Subject: [PATCH 08/15] build: support dd-trace package override and filter publish layer check - build_layers.sh: add DD_TRACE_PACKAGE, DD_TRACE_COMMIT, and DD_TRACE_COMMIT_BRANCH env vars to override the dd-trace dependency used inside the layer (useful for git-bisect / branch testing). The resolved spec is forwarded to docker as a build-arg. - Dockerfile: accept an optional `dd_trace_package` build-arg. When set, rewrite package.json's dd-trace dep to the override spec before yarn install. The rewrite is gated on the build-arg being non-empty, so default builds are unchanged. - publish_layers.sh: only require layer zips for the layers in LAYERS, rather than unconditionally requiring all 4 entries in LAYER_PATHS. Co-Authored-By: Claude Opus 4.7 --- Dockerfile | 4 ++++ scripts/build_layers.sh | 34 +++++++++++++++++++++++++++++++++- scripts/publish_layers.sh | 22 +++++++++++++--------- 3 files changed, 50 insertions(+), 10 deletions(-) diff --git a/Dockerfile b/Dockerfile index 760b7ab82..5e7d2e145 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,7 @@ ARG image FROM $image AS builder ARG image +ARG dd_trace_package # Create the directory structure required for AWS Lambda Layer RUN mkdir -p /nodejs/node_modules/ @@ -8,6 +9,9 @@ RUN mkdir -p /nodejs/node_modules/ # Install dev dependencies COPY . datadog-lambda-js WORKDIR /datadog-lambda-js +RUN if [ -n "$dd_trace_package" ]; then \ + node -e 'const fs = require("fs"); const pkg = JSON.parse(fs.readFileSync("package.json", "utf8")); const spec = process.argv[1]; if (pkg.devDependencies && pkg.devDependencies["dd-trace"]) { pkg.devDependencies["dd-trace"] = spec; } else { pkg.dependencies = pkg.dependencies || {}; pkg.dependencies["dd-trace"] = spec; } fs.writeFileSync("package.json", JSON.stringify(pkg, null, 2) + "\n");' "$dd_trace_package"; \ + fi RUN yarn install # Build the lambda layer diff --git a/scripts/build_layers.sh b/scripts/build_layers.sh index bd8786e8d..731c64163 100755 --- a/scripts/build_layers.sh +++ b/scripts/build_layers.sh @@ -6,6 +6,21 @@ # Copyright 2019 Datadog, Inc. # Builds Datadog node layers for lambda functions, using Docker +# +# Usage: +# NODE_VERSION=20.19 ./scripts/build_layers.sh +# +# dd-trace-js overrides (highest priority first): +# DD_TRACE_PACKAGE Exact package spec to use for dd-trace. +# DD_TRACE_COMMIT Specific dd-trace-js commit SHA to build from GitHub. +# DD_TRACE_COMMIT_BRANCH dd-trace-js branch name to build from GitHub. +# +# Examples: +# # Build a single layer for Node 20 +# NODE_VERSION=20.19 ./scripts/build_layers.sh +# +# # Build a single layer with dd-trace from a branch +# DD_TRACE_COMMIT_BRANCH=joey/cross-invocation-tracecontext-propagation NODE_VERSION=20.19 ./scripts/build_layers.sh set -e LAYER_DIR=".layers" @@ -27,6 +42,21 @@ else fi fi +function resolve_dd_trace_package { + if [ -n "$DD_TRACE_PACKAGE" ]; then + echo "$DD_TRACE_PACKAGE" + elif [ -n "$DD_TRACE_COMMIT" ]; then + echo "git+https://github.com/DataDog/dd-trace-js.git#$DD_TRACE_COMMIT" + elif [ -n "$DD_TRACE_COMMIT_BRANCH" ]; then + echo "git+https://github.com/DataDog/dd-trace-js.git#$DD_TRACE_COMMIT_BRANCH" + fi +} + +DD_TRACE_PACKAGE_SPEC=$(resolve_dd_trace_package) +if [ -n "$DD_TRACE_PACKAGE_SPEC" ]; then + echo "Using dd-trace package spec: $DD_TRACE_PACKAGE_SPEC" +fi + function make_path_absolute { echo "$(cd "$(dirname "$1")"; pwd)/$(basename "$1")" } @@ -40,7 +70,9 @@ function docker_build_zip { # between different node runtimes. temp_dir=$(mktemp -d) docker buildx build -t datadog-lambda-layer-node:$1 . --no-cache \ - --build-arg image=registry.ddbuild.io/images/mirror/node:${node_image_version}-bullseye --progress=plain -o $temp_dir/nodejs + --build-arg image=registry.ddbuild.io/images/mirror/node:${node_image_version}-bullseye \ + --build-arg dd_trace_package="$DD_TRACE_PACKAGE_SPEC" \ + --progress=plain -o $temp_dir/nodejs # Zip to destination, and keep directory structure as based in $temp_dir (cd $temp_dir && zip -q -r $destination ./) diff --git a/scripts/publish_layers.sh b/scripts/publish_layers.sh index 827d45451..ad4b964b9 100755 --- a/scripts/publish_layers.sh +++ b/scripts/publish_layers.sh @@ -31,15 +31,6 @@ if [[ ${#LAYER_PATHS[@]} -ne $expected_length ]] || \ exit 1 fi -# Check that the layer files exist -for layer_file in "${LAYER_PATHS[@]}" -do - if [ ! -f $layer_file ]; then - echo "Could not find $layer_file." - exit 1 - fi -done - # Determine the target regions if [ -z "$REGIONS" ]; then echo "Region not specified, running for all available regions." @@ -78,6 +69,19 @@ else echo "Layer version specified: $VERSION" fi +# Check that the layer files exist for the layers we'll actually publish +for layer_name in "${LAYERS[@]}"; do + for i in "${!AVAILABLE_LAYERS[@]}"; do + if [[ "${AVAILABLE_LAYERS[$i]}" = "${layer_name}" ]]; then + layer_file="${LAYER_PATHS[$i]}" + if [ ! -f "$layer_file" ]; then + echo "Could not find $layer_file." + exit 1 + fi + fi + done +done + read -p "Ready to publish version $VERSION of layers ${LAYERS[*]} to regions ${REGIONS[*]} (y/n)?" CONT if [ "$CONT" != "y" ]; then echo "Exiting" From 1652613852e9f6a486e8a864bfa362d11f8694de Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 15 May 2026 15:47:28 -0400 Subject: [PATCH 09/15] revert some unrelated changes --- scripts/build_layers.sh | 36 ++---------------------------------- scripts/publish_layers.sh | 24 ++++++++++-------------- 2 files changed, 12 insertions(+), 48 deletions(-) diff --git a/scripts/build_layers.sh b/scripts/build_layers.sh index 731c64163..62e31db62 100755 --- a/scripts/build_layers.sh +++ b/scripts/build_layers.sh @@ -6,21 +6,6 @@ # Copyright 2019 Datadog, Inc. # Builds Datadog node layers for lambda functions, using Docker -# -# Usage: -# NODE_VERSION=20.19 ./scripts/build_layers.sh -# -# dd-trace-js overrides (highest priority first): -# DD_TRACE_PACKAGE Exact package spec to use for dd-trace. -# DD_TRACE_COMMIT Specific dd-trace-js commit SHA to build from GitHub. -# DD_TRACE_COMMIT_BRANCH dd-trace-js branch name to build from GitHub. -# -# Examples: -# # Build a single layer for Node 20 -# NODE_VERSION=20.19 ./scripts/build_layers.sh -# -# # Build a single layer with dd-trace from a branch -# DD_TRACE_COMMIT_BRANCH=joey/cross-invocation-tracecontext-propagation NODE_VERSION=20.19 ./scripts/build_layers.sh set -e LAYER_DIR=".layers" @@ -42,21 +27,6 @@ else fi fi -function resolve_dd_trace_package { - if [ -n "$DD_TRACE_PACKAGE" ]; then - echo "$DD_TRACE_PACKAGE" - elif [ -n "$DD_TRACE_COMMIT" ]; then - echo "git+https://github.com/DataDog/dd-trace-js.git#$DD_TRACE_COMMIT" - elif [ -n "$DD_TRACE_COMMIT_BRANCH" ]; then - echo "git+https://github.com/DataDog/dd-trace-js.git#$DD_TRACE_COMMIT_BRANCH" - fi -} - -DD_TRACE_PACKAGE_SPEC=$(resolve_dd_trace_package) -if [ -n "$DD_TRACE_PACKAGE_SPEC" ]; then - echo "Using dd-trace package spec: $DD_TRACE_PACKAGE_SPEC" -fi - function make_path_absolute { echo "$(cd "$(dirname "$1")"; pwd)/$(basename "$1")" } @@ -70,9 +40,7 @@ function docker_build_zip { # between different node runtimes. temp_dir=$(mktemp -d) docker buildx build -t datadog-lambda-layer-node:$1 . --no-cache \ - --build-arg image=registry.ddbuild.io/images/mirror/node:${node_image_version}-bullseye \ - --build-arg dd_trace_package="$DD_TRACE_PACKAGE_SPEC" \ - --progress=plain -o $temp_dir/nodejs + --build-arg image=registry.ddbuild.io/images/mirror/node:${node_image_version}-bullseye --progress=plain -o $temp_dir/nodejs # Zip to destination, and keep directory structure as based in $temp_dir (cd $temp_dir && zip -q -r $destination ./) @@ -92,4 +60,4 @@ do done echo "Done creating layers:" -ls $LAYER_DIR | xargs -I _ echo "$LAYER_DIR/_" +ls $LAYER_DIR | xargs -I _ echo "$LAYER_DIR/_" \ No newline at end of file diff --git a/scripts/publish_layers.sh b/scripts/publish_layers.sh index ad4b964b9..7a084e1f8 100755 --- a/scripts/publish_layers.sh +++ b/scripts/publish_layers.sh @@ -31,6 +31,15 @@ if [[ ${#LAYER_PATHS[@]} -ne $expected_length ]] || \ exit 1 fi +# Check that the layer files exist +for layer_file in "${LAYER_PATHS[@]}" +do + if [ ! -f $layer_file ]; then + echo "Could not find $layer_file." + exit 1 + fi +done + # Determine the target regions if [ -z "$REGIONS" ]; then echo "Region not specified, running for all available regions." @@ -69,19 +78,6 @@ else echo "Layer version specified: $VERSION" fi -# Check that the layer files exist for the layers we'll actually publish -for layer_name in "${LAYERS[@]}"; do - for i in "${!AVAILABLE_LAYERS[@]}"; do - if [[ "${AVAILABLE_LAYERS[$i]}" = "${layer_name}" ]]; then - layer_file="${LAYER_PATHS[$i]}" - if [ ! -f "$layer_file" ]; then - echo "Could not find $layer_file." - exit 1 - fi - fi - done -done - read -p "Ready to publish version $VERSION of layers ${LAYERS[*]} to regions ${REGIONS[*]} (y/n)?" CONT if [ "$CONT" != "y" ]; then echo "Exiting" @@ -172,4 +168,4 @@ do done done wait_for_processes -echo "Done !" +echo "Done !" \ No newline at end of file From dfa5e4b0b5ca0a511837327de70eedb9d570f7df Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 15 May 2026 15:49:02 -0400 Subject: [PATCH 10/15] revert irrelavant changes --- scripts/build_layers.sh | 2 +- scripts/publish_layers.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/build_layers.sh b/scripts/build_layers.sh index 62e31db62..bd8786e8d 100755 --- a/scripts/build_layers.sh +++ b/scripts/build_layers.sh @@ -60,4 +60,4 @@ do done echo "Done creating layers:" -ls $LAYER_DIR | xargs -I _ echo "$LAYER_DIR/_" \ No newline at end of file +ls $LAYER_DIR | xargs -I _ echo "$LAYER_DIR/_" diff --git a/scripts/publish_layers.sh b/scripts/publish_layers.sh index 7a084e1f8..827d45451 100755 --- a/scripts/publish_layers.sh +++ b/scripts/publish_layers.sh @@ -168,4 +168,4 @@ do done done wait_for_processes -echo "Done !" \ No newline at end of file +echo "Done !" From 8a19b6430e146d18bf9c57cd784110b0272bc247 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 15 May 2026 16:19:03 -0400 Subject: [PATCH 11/15] lint --- .../extractors/durable-execution.spec.ts | 21 ++++++++----------- .../context/extractors/durable-execution.ts | 8 ++----- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index fd8e46fc2..a83385278 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -18,8 +18,7 @@ describe("DurableExecutionEventTraceExtractor", () => { }); it("delegates checkpoint headers to the standard propagator", () => { - const executionArn = - "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; + const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; const checkpointHeaders = { "x-datadog-trace-id": "149750110124521191", @@ -68,8 +67,7 @@ describe("DurableExecutionEventTraceExtractor", () => { }); it("creates durable root span only for first invocation", () => { - const executionArn = - "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/first"; + const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/first"; const spanContext: any = { _spanId: null, @@ -108,20 +106,19 @@ describe("DurableExecutionEventTraceExtractor", () => { }); it("skips durable root span creation on replay invocations", () => { - const executionArn = - "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/replay"; + const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/replay"; const replayEvent = { DurableExecutionArn: executionArn, CheckpointToken: "t-replay", InitialExecutionState: { Operations: [ - { - Id: "op-1", - Name: "_datadog_0", - Status: "SUCCEEDED", - StepDetails: { - Result: JSON.stringify({ + { + Id: "op-1", + Name: "_datadog_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ "x-datadog-trace-id": "149750110124521191", "x-datadog-parent-id": "538591322263933970", "x-datadog-sampling-priority": "1", diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index 5fb6c2a55..ca0d240b4 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -298,9 +298,7 @@ export function getCompletedOperationCount(event: unknown): number { return 0; } - return operations.filter((op) => - op.Status === "SUCCEEDED" || op.Status === "FAILED" - ).length; + return operations.filter((op) => op.Status === "SUCCEEDED" || op.Status === "FAILED").length; } /** @@ -330,9 +328,7 @@ export function createDurableExecutionRootSpan( } const operations = event.InitialExecutionState?.Operations; - const hasCheckpoint = Boolean( - operations?.some((op) => isTraceCheckpointName(op?.Name)), - ); + const hasCheckpoint = Boolean(operations?.some((op) => isTraceCheckpointName(op?.Name))); const hasCompletedOperation = Boolean(operations?.some((op) => TERMINAL_STATUSES.has(op.Status))); const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; From e54a187a80cb47740d8ff7862182e18708af5a84 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 15 May 2026 16:21:02 -0400 Subject: [PATCH 12/15] revert dockerfile --- Dockerfile | 3 --- 1 file changed, 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5e7d2e145..c562c96c4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,9 +9,6 @@ RUN mkdir -p /nodejs/node_modules/ # Install dev dependencies COPY . datadog-lambda-js WORKDIR /datadog-lambda-js -RUN if [ -n "$dd_trace_package" ]; then \ - node -e 'const fs = require("fs"); const pkg = JSON.parse(fs.readFileSync("package.json", "utf8")); const spec = process.argv[1]; if (pkg.devDependencies && pkg.devDependencies["dd-trace"]) { pkg.devDependencies["dd-trace"] = spec; } else { pkg.dependencies = pkg.dependencies || {}; pkg.dependencies["dd-trace"] = spec; } fs.writeFileSync("package.json", JSON.stringify(pkg, null, 2) + "\n");' "$dd_trace_package"; \ - fi RUN yarn install # Build the lambda layer From 3cef4f5f7c0a4fd54f4c78b2366c0cdf3c6b3a0b Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 15 May 2026 16:21:38 -0400 Subject: [PATCH 13/15] revert dockerfile --- Dockerfile | 1 - 1 file changed, 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index c562c96c4..760b7ab82 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,6 @@ ARG image FROM $image AS builder ARG image -ARG dd_trace_package # Create the directory structure required for AWS Lambda Layer RUN mkdir -p /nodejs/node_modules/ From 08bf22e5527857444cc93cb915d503237865f264 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 15 May 2026 16:51:46 -0400 Subject: [PATCH 14/15] refactor(durable-execution): drop root span, force datadog-only checkpoint extraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two changes to align with the dd-trace-js cross-invocation tracecontext propagation branch (joey/cross-invocation-tracecontext-propagation): 1. Remove createDurableExecutionRootSpan. This integration no longer constructs a synthetic aws.durable-execution root span — the canonical entry point for a durable execution is the first aws.durable.execute span created by the dd-trace-js plugin. The Lambda wrapper just extracts the saved trace context and lets the aws.lambda span (and downstream plugin spans) anchor to it. The removed function had no production caller in this repo; only spec-only references existed. 2. Add TracerWrapper.extractDatadogOnly and use it for the _datadog_{N} checkpoint extraction path. Checkpoints are written by dd-trace-js in Datadog style only (regardless of DD_TRACE_PROPAGATION_STYLE_INJECT), so we must extract them with a matching forced-datadog propagator. Built by shadowing tracePropagationStyle.extract = ['datadog'] on a sibling propagator via the live text_map propagator's constructor, so we don't bind to a dd-trace internal module path. Upstream customer-event headers continue to use the standard extract since they come from arbitrary services with arbitrary propagation styles. --- .../extractors/durable-execution.spec.ts | 120 ++++++------------ .../context/extractors/durable-execution.ts | 111 ++-------------- src/trace/context/extractors/index.ts | 2 +- src/trace/tracer-wrapper.ts | 67 ++++++++++ 4 files changed, 121 insertions(+), 179 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index a83385278..2a27c4441 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -1,23 +1,19 @@ -import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; +import { DurableExecutionEventTraceExtractor } from "./durable-execution"; import { TracerWrapper } from "../../tracer-wrapper"; -jest.mock("dd-trace", () => ({ - startSpan: jest.fn(), -})); - -function makeTracerWrapper(extractReturn: any = null): TracerWrapper { - return { extract: jest.fn().mockReturnValue(extractReturn) } as unknown as TracerWrapper; +function makeTracerWrapper(opts: { datadogOnly?: any; standard?: any } = {}): TracerWrapper { + return { + extract: jest.fn().mockReturnValue(opts.standard ?? null), + extractDatadogOnly: jest.fn().mockReturnValue(opts.datadogOnly ?? null), + } as unknown as TracerWrapper; } describe("DurableExecutionEventTraceExtractor", () => { - const tracer = require("dd-trace"); - const startSpanMock = tracer.startSpan as jest.Mock; - beforeEach(() => { jest.clearAllMocks(); }); - it("delegates checkpoint headers to the standard propagator", () => { + it("extracts checkpoint headers with the datadog-only propagator", () => { const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; const checkpointHeaders = { @@ -44,102 +40,66 @@ describe("DurableExecutionEventTraceExtractor", () => { }; const sentinelContext = { sentinel: true }; - const tracerWrapper = makeTracerWrapper(sentinelContext); + const tracerWrapper = makeTracerWrapper({ datadogOnly: sentinelContext }); const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); const context = extractor.extract(event); - expect(tracerWrapper.extract).toHaveBeenCalledWith(checkpointHeaders); - expect(context).toBe(sentinelContext); - }); - - it("returns null when no checkpoint or upstream context exists", () => { - const tracerWrapper = makeTracerWrapper(); - const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); - - const context = extractor.extract({ - DurableExecutionArn: "arn:aws:lambda:us-east-2:123:function:demo", - CheckpointToken: "t-empty", - InitialExecutionState: { Operations: [] }, - }); - - expect(context).toBeNull(); + // Checkpoints are written by dd-trace-js in Datadog style only — extract + // must use the matching forced-datadog propagator, not the user-configured one. + expect(tracerWrapper.extractDatadogOnly).toHaveBeenCalledWith(checkpointHeaders); expect(tracerWrapper.extract).not.toHaveBeenCalled(); + expect(context).toBe(sentinelContext); }); - it("creates durable root span only for first invocation", () => { - const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/first"; + it("falls back to standard extract for upstream customer headers", () => { + const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/upstream"; - const spanContext: any = { - _spanId: null, - _parentId: null, - toTraceId: () => "1111111111111111111", - toSpanId: () => "2222222222222222222", - }; - const span = { - context: () => spanContext, - finish: jest.fn(), + const upstreamHeaders = { + "x-datadog-trace-id": "111", + traceparent: "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-bbbbbbbbbbbbbbbb-01", }; - startSpanMock.mockReturnValue(span); - const firstInvocationEvent = { + const event = { DurableExecutionArn: executionArn, - CheckpointToken: "t-first", + CheckpointToken: "t-upstream", InitialExecutionState: { Operations: [ { Id: "op-1", Name: "input", Status: "RUNNING", - StartTimestamp: 1710000000000, ExecutionDetails: { - InputPayload: JSON.stringify({ hello: "world" }), + InputPayload: JSON.stringify({ headers: upstreamHeaders }), }, }, ], }, }; - const root = createDurableExecutionRootSpan(firstInvocationEvent, null); + const sentinelContext = { sentinel: "upstream" }; + const tracerWrapper = makeTracerWrapper({ standard: sentinelContext }); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + const context = extractor.extract(event); - expect(root).not.toBeNull(); - expect(startSpanMock).toHaveBeenCalledTimes(1); + // Upstream headers come from arbitrary services; honor the user's + // propagation-style configuration here. + expect(tracerWrapper.extract).toHaveBeenCalledWith(upstreamHeaders); + expect(tracerWrapper.extractDatadogOnly).not.toHaveBeenCalled(); + expect(context).toBe(sentinelContext); }); - it("skips durable root span creation on replay invocations", () => { - const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/replay"; - - const replayEvent = { - DurableExecutionArn: executionArn, - CheckpointToken: "t-replay", - InitialExecutionState: { - Operations: [ - { - Id: "op-1", - Name: "_datadog_0", - Status: "SUCCEEDED", - StepDetails: { - Result: JSON.stringify({ - "x-datadog-trace-id": "149750110124521191", - "x-datadog-parent-id": "538591322263933970", - "x-datadog-sampling-priority": "1", - }), - }, - }, - { - Id: "op-2", - Name: "callback_step_prepare", - Status: "SUCCEEDED", - }, - ], - }, - }; - - const tracerWrapper = makeTracerWrapper({ source: "Event" }); + it("returns null when no checkpoint or upstream context exists", () => { + const tracerWrapper = makeTracerWrapper(); const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); - const extracted = extractor.extract(replayEvent); - const root = createDurableExecutionRootSpan(replayEvent, extracted); - expect(root).toBeNull(); - expect(startSpanMock).not.toHaveBeenCalled(); + const context = extractor.extract({ + DurableExecutionArn: "arn:aws:lambda:us-east-2:123:function:demo", + CheckpointToken: "t-empty", + InitialExecutionState: { Operations: [] }, + }); + + expect(context).toBeNull(); + expect(tracerWrapper.extract).not.toHaveBeenCalled(); + expect(tracerWrapper.extractDatadogOnly).not.toHaveBeenCalled(); }); }); diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index ca0d240b4..f171c9430 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -7,9 +7,17 @@ * from the original customer event stored in `Operations[0].ExecutionDetails.InputPayload`. * 3. If neither exists, return null and let the default extraction path create the context. * - * The dd-trace-js durable-execution plugin writes checkpoint headers via the - * standard HTTP propagator (`tracer.inject(span, 'http_headers', headers)`), - * so we just hand the resulting header dict back to `tracer.extract` here. + * The extracted context becomes the parent of the `aws.lambda` span (and any + * downstream spans created by dd-trace-js, including `aws.durable.execute`). + * This integration no longer creates a separate root span — anchoring to the + * first `aws.durable.execute` span in dd-trace-js is the canonical entry point + * for a durable execution. + * + * The dd-trace-js plugin writes checkpoint headers in **Datadog style only** + * (regardless of `DD_TRACE_PROPAGATION_STYLE_INJECT`), so we extract them with + * a matching forced-datadog propagator via `TracerWrapper.extractDatadogOnly`. + * Upstream customer-event headers come from arbitrary services and continue to + * be extracted with the user-configured style via `TracerWrapper.extract`. */ import { logDebug } from "../../../utils"; @@ -121,10 +129,6 @@ function parseTraceCheckpointNumber(name: unknown): number | null { return n; } -function isTraceCheckpointName(name: unknown): boolean { - return parseTraceCheckpointNumber(name) !== null; -} - /** * Find the highest-numbered `_datadog_{N}` checkpoint in the event and return * its parsed header dict. @@ -218,8 +222,8 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor const checkpointHeaders = findLatestCheckpointHeaders(event); if (checkpointHeaders) { - logDebug("Extracting trace context from durable checkpoint"); - return this.tracerWrapper.extract(checkpointHeaders); + logDebug("Extracting trace context from durable checkpoint (datadog-only)"); + return this.tracerWrapper.extractDatadogOnly(checkpointHeaders); } const upstreamHeaders = findUpstreamHeaders(event); @@ -301,92 +305,3 @@ export function getCompletedOperationCount(event: unknown): number { return operations.filter((op) => op.Status === "SUCCEEDED" || op.Status === "FAILED").length; } -/** - * Create the durable execution root span for likely first invocations only. - * - * Replay invocations return null. The current first-invocation heuristic is: - * - no trace checkpoint operation exists - * - no operation has terminal status - * - operation count is <= 1 - * - * The created span is parented to the current aws.lambda span context. - * - * Returns an object with { span, finish() } or null if not a durable execution. - * Caller must call finish() when the invocation ends. - */ -export function createDurableExecutionRootSpan( - event: unknown, - parentSpanContext?: unknown, -): { span: any; finish: () => void } | null { - if (!isDurableExecutionEvent(event)) { - return null; - } - - const executionArn = event.DurableExecutionArn; - if (!executionArn) { - return null; - } - - const operations = event.InitialExecutionState?.Operations; - const hasCheckpoint = Boolean(operations?.some((op) => isTraceCheckpointName(op?.Name))); - const hasCompletedOperation = Boolean(operations?.some((op) => TERMINAL_STATUSES.has(op.Status))); - const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; - - if (!isLikelyFirstInvocation) { - return null; - } - - // Use the first operation's StartTimestamp (unix milliseconds) so the root - // span's start time matches the actual start of the durable execution. - let startTime: number | undefined; - if (operations && operations.length > 0) { - const firstStartTs = operations[0].StartTimestamp; - if (firstStartTs != null) { - const parsed = Number(firstStartTs); - if (!isNaN(parsed)) { - startTime = parsed; - } - } - } - - try { - const tracer = require("dd-trace"); - - const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; - const resourceName = executionArn.includes(":") ? executionArn.split(":").pop() : executionArn; - - const spanOptions: Record = { - type: "serverless", - tags: { - "service.name": serviceName, - "resource.name": resourceName, - "durable.execution_arn": executionArn, - "durable.is_root_span": true, - "durable.invocation_count": operations?.length ?? 0, - }, - }; - - if (startTime !== undefined) { - spanOptions.startTime = startTime; - } - if (parentSpanContext) { - // Root span is modeled as a child of aws.lambda. - spanOptions.childOf = parentSpanContext; - } - - const span = tracer.startSpan("aws.durable-execution", spanOptions); - - logDebug(`Created root execution span: start_time=${startTime}`); - - return { - span, - finish: () => { - span.finish(); - logDebug("Finished root execution span"); - }, - }; - } catch (e) { - logDebug(`Failed to create durable execution root span: ${e}`); - return null; - } -} diff --git a/src/trace/context/extractors/index.ts b/src/trace/context/extractors/index.ts index 0fbbf081b..49e097f0f 100644 --- a/src/trace/context/extractors/index.ts +++ b/src/trace/context/extractors/index.ts @@ -9,4 +9,4 @@ export { SNSSQSEventTraceExtractor } from "./sns-sqs"; export { StepFunctionEventTraceExtractor } from "./step-function"; export { LambdaContextTraceExtractor } from "./lambda-context"; export { CustomTraceExtractor } from "./custom"; -export { DurableExecutionEventTraceExtractor, isDurableExecutionEvent, isDurableExecutionReplay, createDurableExecutionRootSpan } from "./durable-execution"; +export { DurableExecutionEventTraceExtractor, isDurableExecutionEvent, isDurableExecutionReplay } from "./durable-execution"; diff --git a/src/trace/tracer-wrapper.ts b/src/trace/tracer-wrapper.ts index 1d02b5caa..c93232b65 100644 --- a/src/trace/tracer-wrapper.ts +++ b/src/trace/tracer-wrapper.ts @@ -27,6 +27,11 @@ export interface TraceOptions { // This lets a customer bring their own version of the tracer. export class TracerWrapper { private tracer: any; + // Cached propagator that extracts only Datadog-style headers (`x-datadog-*`) + // regardless of `DD_TRACE_PROPAGATION_STYLE_EXTRACT`. Built lazily on first + // datadog-only extract because the underlying tracer config is mutated + // during dd-trace init and only stable after the user's handler is invoked. + private datadogOnlyPropagator: any | null | undefined; constructor() { try { @@ -67,6 +72,68 @@ export class TracerWrapper { return spanContext; } + /** + * Extract a span context from a header dict using **only** the Datadog + * propagation style, ignoring `DD_TRACE_PROPAGATION_STYLE_EXTRACT`. + * + * Use this for carriers we know are written by Datadog code (e.g. the + * `_datadog_{N}` trace-context checkpoints written by the + * datadog-plugin-aws-durable-execution-sdk-js plugin). For carriers + * originating from arbitrary upstream services, use `extract` instead so + * the user's propagation-style configuration is honored. + */ + public extractDatadogOnly(headers: any): SpanContextWrapper | null { + if (!this.isTracerAvailable) { + return null; + } + + const propagator = this.getOrBuildDatadogOnlyPropagator(); + if (!propagator) { + // Fallback: if we couldn't reach into dd-trace internals to build a + // forced-datadog propagator, defer to the standard extract. This is + // strictly less correct (honors user style) but better than dropping + // the context entirely. + return this.extract(headers); + } + + try { + const extractedSpanContext = propagator.extract(headers); + if (!extractedSpanContext) return null; + return new SpanContextWrapper(extractedSpanContext, TraceSource.Event); + } catch (err) { + if (err instanceof Object || err instanceof Error) { + logDebug("Datadog-only extract failed", err); + } + return null; + } + } + + private getOrBuildDatadogOnlyPropagator(): any | null { + if (this.datadogOnlyPropagator !== undefined) return this.datadogOnlyPropagator; + this.datadogOnlyPropagator = null; + try { + const innerTracer = this.tracer?._tracer; + const config = innerTracer?._config; + const existing = innerTracer?._propagators?.text_map; + if (!config || !existing) return null; + // Shadow tracePropagationStyle.extract while inheriting every other + // field (baggage limits, x-datadog-tags length cap, etc.). + const shadow = Object.create(config); + shadow.tracePropagationStyle = { + ...config.tracePropagationStyle, + extract: ["datadog"], + }; + // Build a sibling TextMapPropagator via the live propagator's constructor + // so we don't have to bind to a dd-trace internal module path. + this.datadogOnlyPropagator = new existing.constructor(shadow); + } catch (err) { + if (err instanceof Object || err instanceof Error) { + logDebug("Failed to build datadog-only propagator", err); + } + } + return this.datadogOnlyPropagator; + } + public wrap any>(name: string, options: TraceOptions, fn: T) { if (!this.isTracerAvailable) { return fn; From 1f4e7eae32b03e850bfaad68d091470e18f9ef9f Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Sat, 16 May 2026 15:42:18 -0400 Subject: [PATCH 15/15] refactor(durable-execution): drop upstream-headers extraction path We don't currently expect tracecontext to be carried on the customer event that triggers a durable execution, so the upstream-headers branch had no real input to operate on. Drop the call site (which referenced the deleted findUpstreamHeaders helper and was a compile error after the partial removal) along with the corresponding test and the now-unused `standard` return mock in the test helper. The extractor now follows a single path: pull the latest _datadog_{N} checkpoint payload and hand it to TracerWrapper.extractDatadogOnly. --- .../extractors/durable-execution.spec.ts | 48 ++-------------- .../context/extractors/durable-execution.ts | 57 ++----------------- 2 files changed, 8 insertions(+), 97 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index 2a27c4441..8a7f8e6ed 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -1,10 +1,9 @@ import { DurableExecutionEventTraceExtractor } from "./durable-execution"; import { TracerWrapper } from "../../tracer-wrapper"; -function makeTracerWrapper(opts: { datadogOnly?: any; standard?: any } = {}): TracerWrapper { +function makeTracerWrapper(datadogOnlyReturn: any = null): TracerWrapper { return { - extract: jest.fn().mockReturnValue(opts.standard ?? null), - extractDatadogOnly: jest.fn().mockReturnValue(opts.datadogOnly ?? null), + extractDatadogOnly: jest.fn().mockReturnValue(datadogOnlyReturn), } as unknown as TracerWrapper; } @@ -40,55 +39,17 @@ describe("DurableExecutionEventTraceExtractor", () => { }; const sentinelContext = { sentinel: true }; - const tracerWrapper = makeTracerWrapper({ datadogOnly: sentinelContext }); + const tracerWrapper = makeTracerWrapper(sentinelContext); const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); const context = extractor.extract(event); // Checkpoints are written by dd-trace-js in Datadog style only — extract // must use the matching forced-datadog propagator, not the user-configured one. expect(tracerWrapper.extractDatadogOnly).toHaveBeenCalledWith(checkpointHeaders); - expect(tracerWrapper.extract).not.toHaveBeenCalled(); expect(context).toBe(sentinelContext); }); - it("falls back to standard extract for upstream customer headers", () => { - const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/upstream"; - - const upstreamHeaders = { - "x-datadog-trace-id": "111", - traceparent: "00-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-bbbbbbbbbbbbbbbb-01", - }; - - const event = { - DurableExecutionArn: executionArn, - CheckpointToken: "t-upstream", - InitialExecutionState: { - Operations: [ - { - Id: "op-1", - Name: "input", - Status: "RUNNING", - ExecutionDetails: { - InputPayload: JSON.stringify({ headers: upstreamHeaders }), - }, - }, - ], - }, - }; - - const sentinelContext = { sentinel: "upstream" }; - const tracerWrapper = makeTracerWrapper({ standard: sentinelContext }); - const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); - const context = extractor.extract(event); - - // Upstream headers come from arbitrary services; honor the user's - // propagation-style configuration here. - expect(tracerWrapper.extract).toHaveBeenCalledWith(upstreamHeaders); - expect(tracerWrapper.extractDatadogOnly).not.toHaveBeenCalled(); - expect(context).toBe(sentinelContext); - }); - - it("returns null when no checkpoint or upstream context exists", () => { + it("returns null when no checkpoint exists", () => { const tracerWrapper = makeTracerWrapper(); const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); @@ -99,7 +60,6 @@ describe("DurableExecutionEventTraceExtractor", () => { }); expect(context).toBeNull(); - expect(tracerWrapper.extract).not.toHaveBeenCalled(); expect(tracerWrapper.extractDatadogOnly).not.toHaveBeenCalled(); }); }); diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index f171c9430..7e5326cf9 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -1,11 +1,10 @@ /** - * Durable Execution Trace Extractor — Checkpoint/Upstream Approach + * Durable Execution Trace Extractor — Checkpoint Approach * * Strategy: - * 1. Prefer trace context from the latest `_datadog_{N}` checkpoint. - * 2. If no trace checkpoint exists (first invocation), try upstream trace context - * from the original customer event stored in `Operations[0].ExecutionDetails.InputPayload`. - * 3. If neither exists, return null and let the default extraction path create the context. + * 1. Look for trace context in the latest `_datadog_{N}` checkpoint. + * 2. If no trace checkpoint exists, return null and let the default extraction + * path create the context. * * The extracted context becomes the parent of the `aws.lambda` span (and any * downstream spans created by dd-trace-js, including `aws.durable.execute`). @@ -16,8 +15,6 @@ * The dd-trace-js plugin writes checkpoint headers in **Datadog style only** * (regardless of `DD_TRACE_PROPAGATION_STYLE_INJECT`), so we extract them with * a matching forced-datadog propagator via `TracerWrapper.extractDatadogOnly`. - * Upstream customer-event headers come from arbitrary services and continue to - * be extracted with the user-configured style via `TracerWrapper.extract`. */ import { logDebug } from "../../../utils"; @@ -166,47 +163,7 @@ function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record | null { - try { - const operations = event.InitialExecutionState?.Operations; - if (!operations || operations.length === 0) return null; - - const inputPayloadStr = operations[0].ExecutionDetails?.InputPayload; - if (!inputPayloadStr) return null; - - const customerEvent = JSON.parse(inputPayloadStr); - if (!customerEvent || typeof customerEvent !== "object") return null; - const headers = customerEvent.headers; - if (headers && typeof headers === "object") { - return headers as Record; - } - - const ddData = customerEvent._datadog; - if (ddData && typeof ddData === "object") { - return ddData as Record; - } - } catch (e) { - logDebug(`Failed to read upstream headers from durable input payload: ${e}`); - } - - return null; -} - -/** - * Durable Execution Trace Extractor - * - * Locates trace headers carried inside the durable execution envelope and hands - * them to the standard dd-trace propagator via `TracerWrapper.extract`. Order: - * 1. Latest `_datadog_{N}` checkpoint payload. - * 2. Upstream customer event headers from `InputPayload`. - * 3. Otherwise return null and let the default extraction path take over. - */ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { constructor(private tracerWrapper: TracerWrapper) {} @@ -226,12 +183,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor return this.tracerWrapper.extractDatadogOnly(checkpointHeaders); } - const upstreamHeaders = findUpstreamHeaders(event); - if (upstreamHeaders) { - logDebug("Extracting trace context from upstream durable input payload"); - return this.tracerWrapper.extract(upstreamHeaders); - } - logDebug("No durable trace context found; deferring to default extraction"); return null; }