Skip to content

Commit 272dfb6

Browse files
committed
correlated external traces/logs across multiple runs with external trace context
1 parent bf98996 commit 272dfb6

File tree

18 files changed

+288
-85
lines changed

18 files changed

+288
-85
lines changed

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,9 @@ const { action, loader } = createActionApiRoute(
9393
const service = new TriggerTaskService();
9494

9595
try {
96-
const traceContext =
97-
traceparent && isFromWorker /// If the request is from a worker, we should pass the trace context
98-
? { traceparent, tracestate }
99-
: undefined;
96+
const traceContext = isFromWorker
97+
? { traceparent, tracestate }
98+
: { external: { traceparent, tracestate } };
10099

101100
const oneTimeUseToken = await getOneTimeUseToken(authentication);
102101

@@ -111,6 +110,14 @@ const { action, loader } = createActionApiRoute(
111110
traceContext,
112111
});
113112

113+
logger.debug("[otelContext]", {
114+
taskId: params.taskId,
115+
headers,
116+
options: body.options,
117+
isFromWorker,
118+
traceContext,
119+
});
120+
114121
const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL);
115122

116123
const result = await service.call(

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,14 @@ import {
1010
taskRunErrorEnhancer,
1111
taskRunErrorToString,
1212
TriggerTaskRequestBody,
13+
TriggerTraceContext,
1314
} from "@trigger.dev/core/v3";
14-
import { RunId, stringifyDuration } from "@trigger.dev/core/v3/isomorphic";
15+
import {
16+
parseTraceparent,
17+
RunId,
18+
serializeTraceparent,
19+
stringifyDuration,
20+
} from "@trigger.dev/core/v3/isomorphic";
1521
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
1622
import { createTags } from "~/models/taskRunTag.server";
1723
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
@@ -253,7 +259,11 @@ export class RunEngineTriggerTaskService {
253259
payload: payloadPacket.data ?? "",
254260
payloadType: payloadPacket.dataType,
255261
context: body.context,
256-
traceContext: event.traceContext,
262+
traceContext: this.#propagateExternalTraceContext(
263+
event.traceContext,
264+
parentRun?.traceContext,
265+
event.traceparent?.spanId
266+
),
257267
traceId: event.traceId,
258268
spanId: event.spanId,
259269
parentSpanId:
@@ -341,4 +351,49 @@ export class RunEngineTriggerTaskService {
341351
}
342352
});
343353
}
354+
355+
#propagateExternalTraceContext(
356+
traceContext: Record<string, unknown>,
357+
parentRunTraceContext: unknown,
358+
parentSpanId: string | undefined
359+
): Record<string, unknown> {
360+
if (!parentRunTraceContext) {
361+
return traceContext;
362+
}
363+
364+
const parsedParentRunTraceContext = TriggerTraceContext.safeParse(parentRunTraceContext);
365+
366+
if (!parsedParentRunTraceContext.success) {
367+
return traceContext;
368+
}
369+
370+
const { external } = parsedParentRunTraceContext.data;
371+
372+
if (!external) {
373+
return traceContext;
374+
}
375+
376+
if (!external.traceparent) {
377+
return traceContext;
378+
}
379+
380+
const parsedTraceparent = parseTraceparent(external.traceparent);
381+
382+
if (!parsedTraceparent) {
383+
return traceContext;
384+
}
385+
386+
const newExternalTraceparent = serializeTraceparent(
387+
parsedTraceparent.traceId,
388+
parentSpanId ?? parsedTraceparent.spanId
389+
);
390+
391+
return {
392+
...traceContext,
393+
external: {
394+
...external,
395+
traceparent: newExternalTraceparent,
396+
},
397+
};
398+
}
344399
}

apps/webapp/app/runEngine/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export type TriggerTaskServiceOptions = {
1212
idempotencyKey?: string;
1313
idempotencyKeyExpiresAt?: Date;
1414
triggerVersion?: string;
15-
traceContext?: Record<string, string | undefined>;
15+
traceContext?: Record<string, unknown>;
1616
spanParentAsLink?: boolean;
1717
parentAsLinkType?: "replay" | "trigger";
1818
batchId?: string;
@@ -119,7 +119,7 @@ export interface TriggerTaskValidator {
119119
export type TracedEventSpan = {
120120
traceId: string;
121121
spanId: string;
122-
traceContext: Record<string, string | undefined>;
122+
traceContext: Record<string, unknown>;
123123
traceparent?: {
124124
traceId: string;
125125
spanId: string;

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ import { Attributes, AttributeValue, Link, trace, TraceFlags, Tracer } from "@op
22
import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base";
33
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
44
import {
5+
correctErrorStackTrace,
6+
createPacketAttributesAsJson,
57
ExceptionEventProperties,
68
ExceptionSpanEvent,
9+
flattenAttributes,
10+
isExceptionSpanEvent,
711
NULL_SENTINEL,
12+
omit,
813
PRIMARY_VARIANT,
914
SemanticInternalAttributes,
1015
SpanEvent,
@@ -13,28 +18,24 @@ import {
1318
TaskEventEnvironment,
1419
TaskEventStyle,
1520
TaskRunError,
16-
correctErrorStackTrace,
17-
createPacketAttributesAsJson,
18-
flattenAttributes,
19-
isExceptionSpanEvent,
20-
omit,
2121
unflattenAttributes,
2222
} from "@trigger.dev/core/v3";
23+
import { parseTraceparent, serializeTraceparent } from "@trigger.dev/core/v3/isomorphic";
2324
import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus } from "@trigger.dev/database";
25+
import { nanoid } from "nanoid";
2426
import { createHash } from "node:crypto";
2527
import { EventEmitter } from "node:stream";
2628
import { Gauge } from "prom-client";
27-
import { $replica, PrismaClient, PrismaReplicaClient, prisma } from "~/db.server";
29+
import { $replica, prisma, PrismaClient, PrismaReplicaClient } from "~/db.server";
2830
import { env } from "~/env.server";
2931
import { metricsRegister } from "~/metrics.server";
32+
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
3033
import { logger } from "~/services/logger.server";
3134
import { singleton } from "~/utils/singleton";
3235
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
36+
import { TaskEventStore, TaskEventStoreTable } from "./taskEventStore.server";
3337
import { startActiveSpan } from "./tracer.server";
34-
import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis.server";
3538
import { startSpan } from "./tracing.server";
36-
import { nanoid } from "nanoid";
37-
import { TaskEventStore, TaskEventStoreTable } from "./taskEventStore.server";
3839

3940
const MAX_FLUSH_DEPTH = 5;
4041

@@ -80,7 +81,7 @@ export type SetAttribute<T extends TraceAttributes> = (key: keyof T, value: T[ke
8081

8182
export type TraceEventOptions = {
8283
kind?: CreatableEventKind;
83-
context?: Record<string, string | undefined>;
84+
context?: Record<string, unknown>;
8485
spanParentAsLink?: boolean;
8586
parentAsLinkType?: "trigger" | "replay";
8687
spanIdSeed?: string;
@@ -932,7 +933,7 @@ export class EventRepository {
932933
traceId,
933934
spanId,
934935
parentId,
935-
tracestate,
936+
tracestate: typeof tracestate === "string" ? tracestate : undefined,
936937
message: message,
937938
serviceName: "api server",
938939
serviceNamespace: "trigger.dev",
@@ -989,6 +990,11 @@ export class EventRepository {
989990
): Promise<TResult> {
990991
const propagatedContext = extractContextFromCarrier(options.context ?? {});
991992

993+
logger.debug("[otelContext]", {
994+
propagatedContext,
995+
options,
996+
});
997+
992998
const start = process.hrtime.bigint();
993999
const startTime = options.startTime ?? getNowInNanoseconds();
9941000

@@ -1002,7 +1008,8 @@ export class EventRepository {
10021008
: this.generateSpanId();
10031009

10041010
const traceContext = {
1005-
traceparent: `00-${traceId}-${spanId}-01`,
1011+
...options.context,
1012+
traceparent: serializeTraceparent(traceId, spanId),
10061013
};
10071014

10081015
const links: Link[] =
@@ -1087,7 +1094,7 @@ export class EventRepository {
10871094
traceId,
10881095
spanId,
10891096
parentId,
1090-
tracestate,
1097+
tracestate: typeof tracestate === "string" ? tracestate : undefined,
10911098
duration: options.incomplete ? 0 : duration,
10921099
isPartial: failedWithError ? false : options.incomplete,
10931100
isError: !!failedWithError,
@@ -1493,36 +1500,21 @@ function excludePartialEventsWithCorrespondingFullEvent(batch: CreatableEvent[])
14931500
);
14941501
}
14951502

1496-
export function extractContextFromCarrier(carrier: Record<string, string | undefined>) {
1503+
export function extractContextFromCarrier(carrier: Record<string, unknown>) {
14971504
const traceparent = carrier["traceparent"];
14981505
const tracestate = carrier["tracestate"];
14991506

1507+
if (typeof traceparent !== "string") {
1508+
return undefined;
1509+
}
1510+
15001511
return {
1512+
...carrier,
15011513
traceparent: parseTraceparent(traceparent),
15021514
tracestate,
15031515
};
15041516
}
15051517

1506-
function parseTraceparent(traceparent?: string): { traceId: string; spanId: string } | undefined {
1507-
if (!traceparent) {
1508-
return undefined;
1509-
}
1510-
1511-
const parts = traceparent.split("-");
1512-
1513-
if (parts.length !== 4) {
1514-
return undefined;
1515-
}
1516-
1517-
const [version, traceId, spanId, flags] = parts;
1518-
1519-
if (version !== "00") {
1520-
return undefined;
1521-
}
1522-
1523-
return { traceId, spanId };
1524-
}
1525-
15261518
function prepareEvent(event: QueriedEvent): PreparedEvent {
15271519
return {
15281520
...event,

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export type TriggerTaskServiceOptions = {
1919
idempotencyKey?: string;
2020
idempotencyKeyExpiresAt?: Date;
2121
triggerVersion?: string;
22-
traceContext?: Record<string, string | undefined>;
22+
traceContext?: Record<string, unknown>;
2323
spanParentAsLink?: boolean;
2424
parentAsLinkType?: "replay" | "trigger";
2525
batchId?: string;

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ export class RunEngine {
385385
payload,
386386
payloadType,
387387
context,
388-
traceContext,
388+
traceContext: traceContext as any,
389389
traceId,
390390
spanId,
391391
parentSpanId,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ export type TriggerParams = {
8989
payload: string;
9090
payloadType: string;
9191
context: any;
92-
traceContext: Record<string, string | undefined>;
92+
traceContext: Record<string, unknown>;
9393
traceId: string;
9494
spanId: string;
9595
parentSpanId?: string;

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ async function loadWorkerManifest() {
165165
return WorkerManifest.parse(raw);
166166
}
167167

168-
async function doBootstrap() {
168+
async function doBootstrap(traceContext: Record<string, unknown>) {
169169
return await runTimelineMetrics.measureMetric("trigger.dev/start", "bootstrap", {}, async () => {
170170
log("Bootstrapping worker");
171171

@@ -182,6 +182,7 @@ async function doBootstrap() {
182182
logExporters: config.telemetry?.logExporters ?? [],
183183
diagLogLevel: (env.TRIGGER_OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none",
184184
forceFlushTimeoutMillis: 30_000,
185+
externalTraceContext: traceContext.external,
185186
});
186187

187188
const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION);
@@ -265,9 +266,9 @@ let bootstrapCache:
265266
}
266267
| undefined;
267268

268-
async function bootstrap() {
269+
async function bootstrap(traceContext: Record<string, unknown>) {
269270
if (!bootstrapCache) {
270-
bootstrapCache = await doBootstrap();
271+
bootstrapCache = await doBootstrap(traceContext);
271272
}
272273

273274
return bootstrapCache;
@@ -371,8 +372,9 @@ const zodIpc = new ZodIpcConnection({
371372
});
372373

373374
try {
374-
const { tracer, tracingSDK, consoleInterceptor, config, workerManifest } =
375-
await bootstrap();
375+
const { tracer, tracingSDK, consoleInterceptor, config, workerManifest } = await bootstrap(
376+
traceContext
377+
);
376378

377379
_tracingSDK = tracingSDK;
378380

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ async function loadWorkerManifest() {
156156
return WorkerManifest.parse(raw);
157157
}
158158

159-
async function doBootstrap() {
159+
async function doBootstrap(traceContext: Record<string, unknown>) {
160160
return await runTimelineMetrics.measureMetric("trigger.dev/start", "bootstrap", {}, async () => {
161161
const workerManifest = await loadWorkerManifest();
162162

@@ -173,6 +173,7 @@ async function doBootstrap() {
173173
forceFlushTimeoutMillis: 30_000,
174174
exporters: config.telemetry?.exporters ?? [],
175175
logExporters: config.telemetry?.logExporters ?? [],
176+
externalTraceContext: traceContext.external,
176177
});
177178

178179
const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION);
@@ -254,9 +255,9 @@ let bootstrapCache:
254255
}
255256
| undefined;
256257

257-
async function bootstrap() {
258+
async function bootstrap(traceContext: Record<string, unknown>) {
258259
if (!bootstrapCache) {
259-
bootstrapCache = await doBootstrap();
260+
bootstrapCache = await doBootstrap(traceContext);
260261
}
261262

262263
return bootstrapCache;
@@ -375,8 +376,9 @@ const zodIpc = new ZodIpcConnection({
375376
});
376377

377378
try {
378-
const { tracer, tracingSDK, consoleInterceptor, config, workerManifest } =
379-
await bootstrap();
379+
const { tracer, tracingSDK, consoleInterceptor, config, workerManifest } = await bootstrap(
380+
traceContext
381+
);
380382

381383
_tracingSDK = tracingSDK;
382384

packages/core/src/v3/apiClient/core.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -617,10 +617,6 @@ export function hasOwn(obj: Object, key: string): boolean {
617617
function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestInit | undefined {
618618
const headers = new Headers(requestInit?.headers);
619619

620-
if (headers.get("x-trigger-worker") !== "true") {
621-
return requestInit;
622-
}
623-
624620
const headersObject = Object.fromEntries(headers.entries());
625621

626622
propagation.inject(context.active(), headersObject);

0 commit comments

Comments
 (0)