Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/trace/context/extractor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { XrayService } from "../xray-service";
import {
AppSyncEventTraceExtractor,
CustomTraceExtractor,
DurableExecutionEventTraceExtractor,
EventBridgeEventTraceExtractor,
EventBridgeSQSEventTraceExtractor,
HTTPEventTraceExtractor,
isDurableExecutionEvent,
KinesisEventTraceExtractor,
LambdaContextTraceExtractor,
SNSEventTraceExtractor,
Expand Down Expand Up @@ -56,6 +58,9 @@ export class TraceContextExtractor {
}
}

// No stripping needed — trace context is stored in dedicated
// `_datadog_{N}` checkpoint operations.

if (spanContext === null) {
this.stepFunctionContextService = StepFunctionContextService.instance(event);
if (this.stepFunctionContextService?.context) {
Expand All @@ -81,6 +86,9 @@ export class TraceContextExtractor {
private getTraceEventExtractor(event: any): EventTraceExtractor | undefined {
if (!event || typeof event !== "object") return;

// Check for durable execution event first (has DurableExecutionArn + CheckpointToken)
if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(this.tracerWrapper);

const headers = event.headers ?? event.multiValueHeaders;
if (headers !== null && typeof headers === "object") {
return new HTTPEventTraceExtractor(this.tracerWrapper, this.config.decodeAuthorizerContext);
Expand Down
65 changes: 65 additions & 0 deletions src/trace/context/extractors/durable-execution.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { DurableExecutionEventTraceExtractor } from "./durable-execution";
import { TracerWrapper } from "../../tracer-wrapper";

function makeTracerWrapper(datadogOnlyReturn: any = null): TracerWrapper {
return {
extractDatadogOnly: jest.fn().mockReturnValue(datadogOnlyReturn),
} as unknown as TracerWrapper;
}

describe("DurableExecutionEventTraceExtractor", () => {
beforeEach(() => {
jest.clearAllMocks();
});

it("extracts checkpoint headers with the datadog-only propagator", () => {
const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc";

const checkpointHeaders = {
"x-datadog-trace-id": "149750110124521191",
"x-datadog-parent-id": "987654321012345678",
"x-datadog-sampling-priority": "1",
};

const event = {
DurableExecutionArn: executionArn,
CheckpointToken: "t-1",
InitialExecutionState: {
Operations: [
{
Id: "op-1",
Name: "_datadog_0",
Status: "SUCCEEDED",
StepDetails: {
Result: JSON.stringify(checkpointHeaders),
},
},
],
},
};

const sentinelContext = { sentinel: true };
const tracerWrapper = makeTracerWrapper(sentinelContext);
const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper);
const context = extractor.extract(event);

// Checkpoints are written by dd-trace-js in Datadog style only — extract
// must use the matching forced-datadog propagator, not the user-configured one.
expect(tracerWrapper.extractDatadogOnly).toHaveBeenCalledWith(checkpointHeaders);
expect(context).toBe(sentinelContext);
});

it("returns null when no checkpoint exists", () => {
const tracerWrapper = makeTracerWrapper();
const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper);

const context = extractor.extract({
DurableExecutionArn: "arn:aws:lambda:us-east-2:123:function:demo",
CheckpointToken: "t-empty",
InitialExecutionState: { Operations: [] },
});

expect(context).toBeNull();
expect(tracerWrapper.extractDatadogOnly).not.toHaveBeenCalled();
});
});
258 changes: 258 additions & 0 deletions src/trace/context/extractors/durable-execution.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/**
* Durable Execution Trace Extractor — Checkpoint Approach
*
* Strategy:
* 1. Look for trace context in the latest `_datadog_{N}` checkpoint.
* 2. If no trace checkpoint exists, return null and let the default extraction
* path create the context.
*
* The extracted context becomes the parent of the `aws.lambda` span (and any
* downstream spans created by dd-trace-js, including `aws.durable.execute`).
* This integration no longer creates a separate root span — anchoring to the
* first `aws.durable.execute` span in dd-trace-js is the canonical entry point
* for a durable execution.
*
* The dd-trace-js plugin writes checkpoint headers in **Datadog style only**
* (regardless of `DD_TRACE_PROPAGATION_STYLE_INJECT`), so we extract them with
* a matching forced-datadog propagator via `TracerWrapper.extractDatadogOnly`.
*/

import { logDebug } from "../../../utils";
import { SpanContextWrapper } from "../../span-context-wrapper";
import { TracerWrapper } from "../../tracer-wrapper";
import { EventTraceExtractor } from "../extractor";

/**
* Interface for operation data in durable execution state
*/
export interface DurableExecutionOperation {
Id: string;
Status: string;
Type?: string;
Name?: string;
ExecutionDetails?: {
InputPayload?: string;
};
StepDetails?: {
Result?: string;
Error?: unknown;
NextAttemptTimestamp?: string;
};
Payload?: string;
CallbackDetails?: {
Result?: string;
CallbackId?: string;
Error?: unknown;
};
StartedAt?: string;
StartTimestamp?: number;
CompletedAt?: string;
}

/**
* Interface for initial execution state in durable execution events
*/
export interface InitialExecutionState {
Operations?: DurableExecutionOperation[];
Status?: string;
}

/**
* Interface for durable execution event
*/
export interface DurableExecutionEvent {
DurableExecutionArn?: string;
CheckpointToken?: string;
InitialExecutionState?: InitialExecutionState;
Input?: unknown;
}

/**
* Check if event is a durable execution event
*/
export function isDurableExecutionEvent(event: unknown): event is DurableExecutionEvent {
if (!event || typeof event !== "object") {
return false;
}

const maybeEvent = event as Record<string, unknown>;
return Boolean(maybeEvent.DurableExecutionArn && maybeEvent.CheckpointToken);
}

/**
* Check if this is a replay invocation (has previous operations)
*/
export function isDurableExecutionReplay(event: unknown): boolean {
if (!isDurableExecutionEvent(event)) {
return false;
}

const operations = event.InitialExecutionState?.Operations;
return Array.isArray(operations) && operations.length > 0;
}

/**
* Get durable execution ARN from event
*/
export function getDurableExecutionArn(event: unknown): string | undefined {
if (!isDurableExecutionEvent(event)) {
return undefined;
}
return event.DurableExecutionArn;
}

/**
* Get checkpoint token from event
*/
export function getCheckpointToken(event: unknown): string | undefined {
if (!isDurableExecutionEvent(event)) {
return undefined;
}
return event.CheckpointToken;
}

// Terminal operation statuses that indicate an operation has completed
const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]);

const TRACE_CHECKPOINT_NAME_PREFIX = "_datadog_";

function parseTraceCheckpointNumber(name: unknown): number | null {
if (typeof name !== "string") return null;

if (!name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) return null;
const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length);
const n = Number.parseInt(suffix, 10);
if (Number.isNaN(n) || String(n) !== suffix) return null;
return n;
}

/**
* Find the highest-numbered `_datadog_{N}` checkpoint in the event and return
* its parsed header dict.
*
* Each invocation that changes trace context saves a new checkpoint with N+1;
* the one with the highest N is the most recent. Headers are written by the
* dd-trace-js plugin via `tracer.inject(span, 'http_headers', headers)` so the
* payload is a standard HTTP-style header dict.
*
*/
function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record<string, string> | null {
const operations = event.InitialExecutionState?.Operations;
if (!operations || operations.length === 0) return null;

let best: { number: number; op: DurableExecutionOperation } | null = null;
for (const op of operations) {
const n = parseTraceCheckpointNumber(op?.Name);
if (n === null) continue;
if (best === null || n > best.number) {
best = { number: n, op };
}
}
if (best === null) return null;

const raw = best.op.Payload ?? best.op.StepDetails?.Result;
if (!raw || typeof raw !== "string") return null;
try {
const parsed = JSON.parse(raw);
if (parsed && typeof parsed === "object") {
return parsed as Record<string, string>;
}
} catch (e) {
logDebug(`Failed to parse trace checkpoint payload: ${e}`);
}
return null;
}


export class DurableExecutionEventTraceExtractor implements EventTraceExtractor {
constructor(private tracerWrapper: TracerWrapper) {}

extract(event: unknown): SpanContextWrapper | null {
if (!isDurableExecutionEvent(event)) {
logDebug("Event is not a durable execution event");
return null;
}
if (!event.DurableExecutionArn) {
logDebug("No DurableExecutionArn in event");
return null;
}

const checkpointHeaders = findLatestCheckpointHeaders(event);
if (checkpointHeaders) {
logDebug("Extracting trace context from durable checkpoint (datadog-only)");
return this.tracerWrapper.extractDatadogOnly(checkpointHeaders);
}

logDebug("No durable trace context found; deferring to default extraction");
return null;
}
}

/**
* Utility to check if a durable operation is a replay
*
* An operation is a replay if it exists in the initial execution state
* with a terminal status (SUCCEEDED, FAILED, CANCELLED, STOPPED, TIMED_OUT)
*
* @param event - Lambda event
* @param stepId - The step ID to check (may be hashed)
* @returns true if the operation is a replay
*/
export function isOperationReplay(event: unknown, stepId: string): boolean {
if (!isDurableExecutionEvent(event)) {
return false;
}

const operations = event.InitialExecutionState?.Operations;
if (!operations || operations.length === 0) {
return false;
}

const operation = operations.find((op) => op.Id === stepId);
if (!operation) {
return false;
}

return TERMINAL_STATUSES.has(operation.Status);
}

/**
* Get the replay status of an operation
*
* @param event - Lambda event
* @param stepId - The step ID to check
* @returns Operation status if found, undefined otherwise
*/
export function getOperationStatus(event: unknown, stepId: string): string | undefined {
if (!isDurableExecutionEvent(event)) {
return undefined;
}

const operations = event.InitialExecutionState?.Operations;
if (!operations) {
return undefined;
}

const operation = operations.find((op) => op.Id === stepId);
return operation?.Status;
}

/**
* Count the number of completed operations in the event
*
* @param event - Lambda event
* @returns Number of completed operations
*/
export function getCompletedOperationCount(event: unknown): number {
if (!isDurableExecutionEvent(event)) {
return 0;
}

const operations = event.InitialExecutionState?.Operations;
if (!operations) {
return 0;
}

return operations.filter((op) => op.Status === "SUCCEEDED" || op.Status === "FAILED").length;
}

1 change: 1 addition & 0 deletions src/trace/context/extractors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export { SNSSQSEventTraceExtractor } from "./sns-sqs";
export { StepFunctionEventTraceExtractor } from "./step-function";
export { LambdaContextTraceExtractor } from "./lambda-context";
export { CustomTraceExtractor } from "./custom";
export { DurableExecutionEventTraceExtractor, isDurableExecutionEvent, isDurableExecutionReplay } from "./durable-execution";
Loading
Loading