From 1477ad3a47d0b2a4c766d1d3b09109f8c6ec7ec3 Mon Sep 17 00:00:00 2001 From: Ritwij Aryan Parmar Date: Fri, 5 Jun 2026 04:23:35 -0400 Subject: [PATCH] fix: restore metrics after waitpoint resume --- .changeset/waitpoint-metric-attribution.md | 6 + .../cli-v3/src/entryPoints/dev-run-worker.ts | 1 + .../src/entryPoints/managed-run-worker.ts | 1 + .../core/src/v3/taskContext/index.test.ts | 15 +++ packages/core/src/v3/taskContext/index.ts | 4 + .../src/v3/taskContext/otelProcessors.test.ts | 117 ++++++++++++++++++ 6 files changed, 144 insertions(+) create mode 100644 .changeset/waitpoint-metric-attribution.md create mode 100644 packages/core/src/v3/taskContext/otelProcessors.test.ts diff --git a/.changeset/waitpoint-metric-attribution.md b/.changeset/waitpoint-metric-attribution.md new file mode 100644 index 00000000000..4a22ae41c8c --- /dev/null +++ b/.changeset/waitpoint-metric-attribution.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +"trigger.dev": patch +--- + +Restore run metric attribution after waitpoint resume by re-enabling task context when the worker resolves a waitpoint. diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index b7f621954c9..7c097292652 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -679,6 +679,7 @@ const zodIpc = new ZodIpcConnection({ } }, RESOLVE_WAITPOINT: async ({ waitpoint }) => { + taskContext.enable(); _sharedWorkerRuntime?.resolveWaitpoints([waitpoint]); }, }, diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index ed8fc9be5e7..36e62b2e60c 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -665,6 +665,7 @@ const zodIpc = new ZodIpcConnection({ } }, RESOLVE_WAITPOINT: async ({ waitpoint }) => { + taskContext.enable(); _sharedWorkerRuntime?.resolveWaitpoints([waitpoint]); }, }, diff --git a/packages/core/src/v3/taskContext/index.test.ts b/packages/core/src/v3/taskContext/index.test.ts index 34d169a177c..982f907bc7d 100644 --- a/packages/core/src/v3/taskContext/index.test.ts +++ b/packages/core/src/v3/taskContext/index.test.ts @@ -83,4 +83,19 @@ describe("TaskContextAPI conversation id", () => { expect(api.attributes[SemanticInternalAttributes.GEN_AI_CONVERSATION_ID]).toBeUndefined(); }); + + it("can re-enable run attribution after a waitpoint flush disabled it", () => { + const api = TaskContextAPI.getInstance(); + api.setGlobalTaskContext({ ctx: FAKE_CTX, worker: FAKE_WORKER }); + + api.disable(); + expect(api.isRunDisabled).toBe(true); + + api.enable(); + + expect(api.isRunDisabled).toBe(false); + expect(api.attributes[SemanticInternalAttributes.RUN_ID]).toBe("run_1"); + expect(api.attributes[SemanticInternalAttributes.TASK_SLUG]).toBe("my-task"); + expect(api.attributes[SemanticInternalAttributes.ATTEMPT_NUMBER]).toBe(1); + }); }); diff --git a/packages/core/src/v3/taskContext/index.ts b/packages/core/src/v3/taskContext/index.ts index c4bbf25c972..59e7e709cc8 100644 --- a/packages/core/src/v3/taskContext/index.ts +++ b/packages/core/src/v3/taskContext/index.ts @@ -132,6 +132,10 @@ export class TaskContextAPI { this._runDisabled = true; } + public enable() { + this._runDisabled = false; + } + public setGlobalTaskContext(taskContext: TaskContext): boolean { this._runDisabled = false; // Each run boot re-registers the global; clear any conversation id diff --git a/packages/core/src/v3/taskContext/otelProcessors.test.ts b/packages/core/src/v3/taskContext/otelProcessors.test.ts new file mode 100644 index 00000000000..7ff8f3db9ac --- /dev/null +++ b/packages/core/src/v3/taskContext/otelProcessors.test.ts @@ -0,0 +1,117 @@ +import type { ExportResult } from "@opentelemetry/core"; +import type { PushMetricExporter, ResourceMetrics } from "@opentelemetry/sdk-metrics"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; +import { taskContext } from "../task-context-api.js"; +import { unregisterGlobal } from "../utils/globals.js"; +import { TaskContextMetricExporter } from "./otelProcessors.js"; + +const FAKE_CTX = { + attempt: { id: "attempt_1", number: 1, startedAt: new Date(), status: "EXECUTING" as const }, + run: { + id: "run_1", + payload: undefined, + payloadType: "application/json", + context: undefined, + createdAt: new Date(), + tags: ["agent", "waitpoint"], + isTest: false, + isReplay: false, + startedAt: new Date(), + durationMs: 0, + costInCents: 0, + baseCostInCents: 0, + }, + task: { id: "agent-workflow", filePath: "src/trigger/agent.ts", exportName: "agentWorkflow" }, + queue: { id: "queue_1", name: "default" }, + environment: { id: "env_1", slug: "dev", type: "DEVELOPMENT" as const }, + organization: { id: "org_1", slug: "acme", name: "Acme" }, + project: { id: "proj_1", ref: "proj_xyz", slug: "demo", name: "Demo" }, + machine: { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, +} as never; + +const FAKE_WORKER = { id: "worker_1", version: "1.0.0", contentHash: "abc" } as never; + +class CapturingMetricExporter implements PushMetricExporter { + public exports: ResourceMetrics[] = []; + + export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void { + this.exports.push(metrics); + resultCallback({ code: 0 }); + } + + async forceFlush(): Promise {} + + async shutdown(): Promise {} +} + +function createMetrics(): ResourceMetrics { + return { + resource: {} as ResourceMetrics["resource"], + scopeMetrics: [ + { + metrics: [ + { + dataPoints: [ + { + attributes: { existing: "value" }, + }, + ], + }, + ], + scope: { name: "test-scope" }, + }, + ], + } as unknown as ResourceMetrics; +} + +function firstDataPointAttributes(metrics: ResourceMetrics) { + return metrics.scopeMetrics[0]!.metrics[0]!.dataPoints[0]!.attributes; +} + +describe("TaskContextMetricExporter run attribution", () => { + afterEach(() => { + unregisterGlobal("task-context"); + taskContext.setConversationId(undefined); + }); + + it("strips run-specific attributes while run context is disabled between active execution", () => { + taskContext.setGlobalTaskContext({ ctx: FAKE_CTX, worker: FAKE_WORKER }); + taskContext.disable(); + + const innerExporter = new CapturingMetricExporter(); + const exporter = new TaskContextMetricExporter(innerExporter); + + exporter.export(createMetrics(), vi.fn()); + + const attrs = firstDataPointAttributes(innerExporter.exports[0]!); + expect(attrs[SemanticInternalAttributes.RUN_ID]).toBeUndefined(); + expect(attrs[SemanticInternalAttributes.TASK_SLUG]).toBeUndefined(); + expect(attrs[SemanticInternalAttributes.ATTEMPT_NUMBER]).toBeUndefined(); + expect(attrs[SemanticInternalAttributes.ENVIRONMENT_ID]).toBe("env_1"); + expect(attrs[SemanticInternalAttributes.PROJECT_ID]).toBe("proj_1"); + }); + + it("restores run attribution after waitpoint resume re-enables task context", () => { + taskContext.setGlobalTaskContext({ ctx: FAKE_CTX, worker: FAKE_WORKER }); + taskContext.disable(); + taskContext.enable(); + + const innerExporter = new CapturingMetricExporter(); + const exporter = new TaskContextMetricExporter(innerExporter); + + exporter.export(createMetrics(), vi.fn()); + + const attrs = firstDataPointAttributes(innerExporter.exports[0]!); + expect(attrs[SemanticInternalAttributes.RUN_ID]).toBe("run_1"); + expect(attrs[SemanticInternalAttributes.TASK_SLUG]).toBe("agent-workflow"); + expect(attrs[SemanticInternalAttributes.ATTEMPT_NUMBER]).toBe(1); + expect(attrs[SemanticInternalAttributes.RUN_TAGS]).toEqual(["agent", "waitpoint"]); + expect(attrs[SemanticInternalAttributes.WORKER_ID]).toBe("worker_1"); + }); +});