Skip to content

Commit 20362af

Browse files
fix(webapp): reconcile trace with run lifecycle to handle clickhouse lag
1 parent 12947e0 commit 20362af

File tree

2 files changed

+241
-39
lines changed

2 files changed

+241
-39
lines changed

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 100 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { getUsername } from "~/utils/username";
66
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
77
import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
88
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
9-
import { isFinalRunStatus } from "~/v3/taskStatus";
9+
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
1010
import { env } from "~/env.server";
1111

1212
type Result = Awaited<ReturnType<RunPresenter["call"]>>;
@@ -215,55 +215,53 @@ export class RunPresenter {
215215

216216
const events = tree
217217
? flattenTree(tree).map((n) => {
218-
const offset = millisecondsToNanoseconds(
219-
n.data.startTime.getTime() - treeRootStartTimeMs
220-
);
221-
//only let non-debug events extend the total duration
222-
if (!n.data.isDebug) {
223-
totalDuration = Math.max(totalDuration, offset + n.data.duration);
224-
}
218+
const offset = millisecondsToNanoseconds(
219+
n.data.startTime.getTime() - treeRootStartTimeMs
220+
);
221+
//only let non-debug events extend the total duration
222+
if (!n.data.isDebug) {
223+
totalDuration = Math.max(totalDuration, offset + n.data.duration);
224+
}
225225

226-
// For cached spans, store the mapping from spanId to the linked run's ID
227-
if (n.data.style?.icon === "task-cached" && n.runId) {
228-
linkedRunIdBySpanId[n.id] = n.runId;
229-
}
226+
// For cached spans, store the mapping from spanId to the linked run's ID
227+
if (n.data.style?.icon === "task-cached" && n.runId) {
228+
linkedRunIdBySpanId[n.id] = n.runId;
229+
}
230230

231-
return {
232-
...n,
233-
data: {
234-
...n.data,
235-
timelineEvents: createTimelineSpanEventsFromSpanEvents(
236-
n.data.events,
237-
user?.admin ?? false,
238-
treeRootStartTimeMs
239-
),
240-
//set partial nodes to null duration
241-
duration: n.data.isPartial ? null : n.data.duration,
242-
offset,
243-
isRoot: n.id === traceSummary.rootSpan.id,
244-
},
245-
};
246-
})
231+
return {
232+
...n,
233+
data: {
234+
...n.data,
235+
timelineEvents: createTimelineSpanEventsFromSpanEvents(
236+
n.data.events,
237+
user?.admin ?? false,
238+
treeRootStartTimeMs
239+
),
240+
//set partial nodes to null duration
241+
duration: n.data.isPartial ? null : n.data.duration,
242+
offset,
243+
isRoot: n.id === traceSummary.rootSpan.id,
244+
},
245+
};
246+
})
247247
: [];
248248

249249
//total duration should be a minimum of 1ms
250250
totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1));
251251

252-
let rootSpanStatus: "executing" | "completed" | "failed" = "executing";
253-
if (events[0]) {
254-
if (events[0].data.isError) {
255-
rootSpanStatus = "failed";
256-
} else if (!events[0].data.isPartial) {
257-
rootSpanStatus = "completed";
258-
}
259-
}
252+
const reconciled = reconcileTraceWithRunLifecycle(
253+
runData,
254+
traceSummary.rootSpan.id,
255+
events,
256+
totalDuration
257+
);
260258

261259
return {
262260
run: runData,
263261
trace: {
264-
rootSpanStatus,
265-
events: events,
266-
duration: totalDuration,
262+
rootSpanStatus: reconciled.rootSpanStatus,
263+
events: reconciled.events,
264+
duration: reconciled.totalDuration,
267265
rootStartedAt: tree?.data.startTime,
268266
startedAt: run.startedAt,
269267
queuedDuration: run.startedAt
@@ -276,3 +274,66 @@ export class RunPresenter {
276274
};
277275
}
278276
}
277+
278+
// NOTE: Clickhouse trace ingestion is eventually consistent.
279+
// When a run is marked finished in Postgres, we reconcile the
280+
// root span to reflect completion even if telemetry is still partial.
281+
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
282+
// run states in the dashboard.
283+
export function reconcileTraceWithRunLifecycle(
284+
runData: {
285+
isFinished: boolean;
286+
status: Run["status"];
287+
createdAt: Date;
288+
completedAt: Date | null;
289+
rootTaskRun: { createdAt: Date } | null;
290+
},
291+
rootSpanId: string,
292+
events: RunEvent[],
293+
totalDuration: number
294+
): {
295+
events: RunEvent[];
296+
totalDuration: number;
297+
rootSpanStatus: "executing" | "completed" | "failed";
298+
} {
299+
const currentStatus: "executing" | "completed" | "failed" = events[0]
300+
? events[0].data.isError
301+
? "failed"
302+
: !events[0].data.isPartial
303+
? "completed"
304+
: "executing"
305+
: "executing";
306+
307+
if (!runData.isFinished) {
308+
return { events, totalDuration, rootSpanStatus: currentStatus };
309+
}
310+
311+
const postgresRunDuration = runData.completedAt
312+
? millisecondsToNanoseconds(
313+
runData.completedAt.getTime() - (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime()
314+
)
315+
: 0;
316+
317+
const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration);
318+
319+
const updatedEvents = events.map((e) => {
320+
if (e.id === rootSpanId && e.data.isPartial) {
321+
return {
322+
...e,
323+
data: {
324+
...e.data,
325+
isPartial: false,
326+
duration: Math.max(e.data.duration ?? 0, postgresRunDuration),
327+
isError: isFailedRunStatus(runData.status),
328+
},
329+
};
330+
}
331+
return e;
332+
});
333+
334+
return {
335+
events: updatedEvents,
336+
totalDuration: updatedTotalDuration,
337+
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
338+
};
339+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { vi, describe, it, expect } from "vitest";
2+
3+
vi.mock("../app/env.server", () => ({
4+
env: {
5+
MAXIMUM_LIVE_RELOADING_EVENTS: 1000,
6+
},
7+
}));
8+
9+
vi.mock("../app/db.server", () => ({
10+
prisma: {},
11+
$replica: {},
12+
$transaction: vi.fn(),
13+
}));
14+
15+
vi.mock("../app/v3/eventRepository/index.server", () => ({
16+
resolveEventRepositoryForStore: vi.fn(),
17+
}));
18+
19+
vi.mock("../app/v3/taskEventStore.server", () => ({
20+
getTaskEventStoreTableForRun: vi.fn(),
21+
}));
22+
23+
vi.mock("../app/utils/username", () => ({
24+
getUsername: vi.fn(),
25+
}));
26+
27+
import { reconcileTraceWithRunLifecycle } from "../app/presenters/v3/RunPresenter.server";
28+
import { millisecondsToNanoseconds } from "@trigger.dev/core/v3";
29+
30+
describe("reconcileTraceWithRunLifecycle", () => {
31+
const rootSpanId = "root-span-id";
32+
const createdAt = new Date("2024-01-01T00:00:00Z");
33+
const completedAt = new Date("2024-01-01T00:00:05Z");
34+
35+
const runData: any = {
36+
isFinished: true,
37+
status: "COMPLETED_SUCCESSFULLY",
38+
createdAt,
39+
completedAt,
40+
rootTaskRun: null,
41+
};
42+
43+
const initialEvents = [
44+
{
45+
id: rootSpanId,
46+
data: {
47+
isPartial: true,
48+
duration: millisecondsToNanoseconds(1000), // 1s, less than the 5s run duration
49+
isError: false,
50+
},
51+
},
52+
{
53+
id: "child-span-id",
54+
data: {
55+
isPartial: false,
56+
duration: millisecondsToNanoseconds(500),
57+
isError: false,
58+
},
59+
},
60+
];
61+
62+
it("should reconcile a finished run with lagging partial telemetry", () => {
63+
const totalDuration = millisecondsToNanoseconds(1000);
64+
const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, initialEvents as any, totalDuration);
65+
66+
expect(result.rootSpanStatus).toBe("completed");
67+
68+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
69+
expect(rootEvent?.data.isPartial).toBe(false);
70+
// 5s duration = 5000ms
71+
expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000));
72+
expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000));
73+
});
74+
75+
it("should not override duration if Clickhouse already has a longer finished duration", () => {
76+
const longDuration = millisecondsToNanoseconds(10000);
77+
const finishedEvents = [
78+
{
79+
id: rootSpanId,
80+
data: {
81+
isPartial: false,
82+
duration: longDuration,
83+
isError: false,
84+
},
85+
},
86+
];
87+
88+
const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, finishedEvents as any, longDuration);
89+
90+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
91+
expect(rootEvent?.data.duration).toBe(longDuration);
92+
expect(rootEvent?.data.isPartial).toBe(false);
93+
expect(result.totalDuration).toBe(longDuration);
94+
});
95+
96+
it("should handle unfinished runs without modification", () => {
97+
const unfinishedRun = { ...runData, isFinished: false, completedAt: null };
98+
const totalDuration = millisecondsToNanoseconds(1000);
99+
const result = reconcileTraceWithRunLifecycle(unfinishedRun, rootSpanId, initialEvents as any, totalDuration);
100+
101+
expect(result.rootSpanStatus).toBe("executing");
102+
103+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
104+
expect(rootEvent?.data.isPartial).toBe(true);
105+
expect(rootEvent?.data.duration).toBe(millisecondsToNanoseconds(1000));
106+
});
107+
108+
it("should reconcile failed runs correctly", () => {
109+
const failedRun = { ...runData, status: "COMPLETED_WITH_ERRORS" };
110+
const result = reconcileTraceWithRunLifecycle(failedRun, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000));
111+
112+
expect(result.rootSpanStatus).toBe("failed");
113+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
114+
expect(rootEvent?.data.isError).toBe(true);
115+
expect(rootEvent?.data.isPartial).toBe(false);
116+
});
117+
118+
it("should use rootTaskRun createdAt if available for duration calculation", () => {
119+
const rootTaskCreatedAt = new Date("2023-12-31T23:59:50Z"); // 10s before run.createdAt
120+
const runDataWithRoot: any = {
121+
...runData,
122+
rootTaskRun: { createdAt: rootTaskCreatedAt },
123+
};
124+
125+
const result = reconcileTraceWithRunLifecycle(runDataWithRoot, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000));
126+
127+
// Duration should be from 23:59:50 to 00:00:05 = 15s
128+
const rootEvent = result.events.find((e: any) => e.id === rootSpanId);
129+
expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000));
130+
expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000));
131+
});
132+
133+
it("should handle missing root span gracefully", () => {
134+
const result = reconcileTraceWithRunLifecycle(runData, "non-existent-id", initialEvents as any, millisecondsToNanoseconds(1000));
135+
136+
expect(result.rootSpanStatus).toBe("completed");
137+
expect(result.events).toEqual(initialEvents);
138+
// totalDuration should still be updated to postgres duration even if root span is missing from events list
139+
expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000));
140+
});
141+
});

0 commit comments

Comments
 (0)