Skip to content

Commit 43897e3

Browse files
refactor(webapp): move reconciliation logic to separate file for better testability
1 parent c027545 commit 43897e3

File tree

2 files changed

+125
-70
lines changed

2 files changed

+125
-70
lines changed

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 36 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ export class RunPresenter {
124124
isFinished: isFinalRunStatus(run.status),
125125
startedAt: run.startedAt,
126126
completedAt: run.completedAt,
127-
createdAt: run.createdAt,
128127
logsDeletedAt: showDeletedLogs ? null : run.logsDeletedAt,
129128
rootTaskRun: run.rootTaskRun,
130129
parentTaskRun: run.parentTaskRun,
@@ -209,43 +208,19 @@ export class RunPresenter {
209208

210209
//we need the start offset for each item, and the total duration of the entire tree
211210
const treeRootStartTimeMs = tree ? tree?.data.startTime.getTime() : 0;
212-
213-
const postgresRunDuration =
214-
runData.isFinished && run.completedAt
215-
? millisecondsToNanoseconds(
216-
run.completedAt.getTime() -
217-
(run.rootTaskRun?.createdAt ?? run.createdAt).getTime()
218-
)
219-
: 0;
220-
221211
let totalDuration = tree?.data.duration ?? 0;
222212

223213
// Build the linkedRunIdBySpanId map during the same walk
224214
const linkedRunIdBySpanId: Record<string, string> = {};
225215

226216
const events = tree
227-
? flattenTree(tree).map((n, index) => {
228-
const isRoot = index === 0;
229-
const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs);
230-
231-
let nIsPartial = n.data.isPartial;
232-
let nDuration = n.data.duration;
233-
let nIsError = n.data.isError;
234-
235-
// NOTE: Clickhouse trace ingestion is eventually consistent.
236-
// When a run is marked finished in Postgres, we reconcile the
237-
// root span to reflect completion even if telemetry is still partial.
238-
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
239-
// run states in the dashboard.
240-
if (isRoot && runData.isFinished && nIsPartial) {
241-
nIsPartial = false;
242-
nDuration = Math.max(nDuration ?? 0, postgresRunDuration);
243-
nIsError = isFailedRunStatus(runData.status);
244-
}
245-
217+
? flattenTree(tree).map((n) => {
218+
const offset = millisecondsToNanoseconds(
219+
n.data.startTime.getTime() - treeRootStartTimeMs
220+
);
246221
//only let non-debug events extend the total duration
247222
if (!n.data.isDebug) {
248-
totalDuration = Math.max(totalDuration, offset + (nIsPartial ? 0 : nDuration));
223+
totalDuration = Math.max(totalDuration, offset + n.data.duration);
249224
}
250225

251226
// For cached spans, store the mapping from spanId to the linked run's ID
@@ -263,24 +238,23 @@ export class RunPresenter {
263238
treeRootStartTimeMs
264239
),
265240
//set partial nodes to null duration
266-
duration: nIsPartial ? null : nDuration,
267-
isPartial: nIsPartial,
268-
isError: nIsError,
241+
duration: n.data.isPartial ? null : n.data.duration,
269242
offset,
270-
isRoot,
243+
isRoot: n.id === traceSummary.rootSpan.id,
271244
},
272245
};
273246
})
274247
: [];
275248

276-
if (runData.isFinished) {
277-
totalDuration = Math.max(totalDuration, postgresRunDuration);
278-
}
279-
280249
//total duration should be a minimum of 1ms
281250
totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1));
282251

283-
const reconciled = reconcileTraceWithRunLifecycle(runData, traceSummary.rootSpan.id, events, totalDuration);
252+
const reconciled = reconcileTraceWithRunLifecycle(
253+
runData,
254+
traceSummary.rootSpan.id,
255+
events,
256+
totalDuration
257+
);
284258

285259
return {
286260
run: runData,
@@ -322,17 +296,14 @@ export function reconcileTraceWithRunLifecycle(
322296
totalDuration: number;
323297
rootSpanStatus: "executing" | "completed" | "failed";
324298
} {
325-
const rootEvent = events[0];
326-
const isActualRoot = rootEvent?.id === rootSpanId;
327-
328-
const currentStatus: "executing" | "completed" | "failed" =
329-
isActualRoot && rootEvent
330-
? rootEvent.data.isError
331-
? "failed"
332-
: !rootEvent.data.isPartial
333-
? "completed"
334-
: "executing"
335-
: "executing";
299+
const rootEvent = events.find((e) => e.id === rootSpanId);
300+
const currentStatus: "executing" | "completed" | "failed" = rootEvent
301+
? rootEvent.data.isError
302+
? "failed"
303+
: !rootEvent.data.isPartial
304+
? "completed"
305+
: "executing"
306+
: "executing";
336307

337308
if (!runData.isFinished) {
338309
return { events, totalDuration, rootSpanStatus: currentStatus };
@@ -347,28 +318,23 @@ export function reconcileTraceWithRunLifecycle(
347318

348319
const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration);
349320

350-
// We only need to potentially update the root event (the first one) if it matches our ID
351-
if (isActualRoot && rootEvent && rootEvent.data.isPartial) {
352-
const updatedEvents = [...events];
353-
updatedEvents[0] = {
354-
...rootEvent,
355-
data: {
356-
...rootEvent.data,
357-
isPartial: false,
358-
duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration),
359-
isError: isFailedRunStatus(runData.status),
360-
},
361-
};
362-
363-
return {
364-
events: updatedEvents,
365-
totalDuration: updatedTotalDuration,
366-
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
367-
};
368-
}
321+
const updatedEvents = events.map((e) => {
322+
if (e.id === rootSpanId && e.data.isPartial) {
323+
return {
324+
...e,
325+
data: {
326+
...e.data,
327+
isPartial: false,
328+
duration: Math.max(e.data.duration ?? 0, postgresRunDuration),
329+
isError: isFailedRunStatus(runData.status),
330+
},
331+
};
332+
}
333+
return e;
334+
});
369335

370336
return {
371-
events,
337+
events: updatedEvents,
372338
totalDuration: updatedTotalDuration,
373339
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
374340
};
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { millisecondsToNanoseconds } from "@trigger.dev/core/v3";
2+
import { isFailedRunStatus } from "~/v3/taskStatus";
3+
import type { TaskRunStatus } from "@trigger.dev/database";
4+
5+
export type ReconcileRunData = {
6+
isFinished: boolean;
7+
status: TaskRunStatus;
8+
createdAt: Date;
9+
completedAt: Date | null;
10+
rootTaskRun: { createdAt: Date } | null;
11+
};
12+
13+
export type ReconcileEvent = {
14+
id: string;
15+
data: {
16+
isPartial: boolean;
17+
isError: boolean;
18+
duration?: number | null;
19+
};
20+
};
21+
22+
export type ReconcileResult = {
23+
events: any[];
24+
totalDuration: number;
25+
rootSpanStatus: "executing" | "completed" | "failed";
26+
};
27+
28+
// NOTE: Clickhouse trace ingestion is eventually consistent.
29+
// When a run is marked finished in Postgres, we reconcile the
30+
// root span to reflect completion even if telemetry is still partial.
31+
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
32+
// run states in the dashboard.
33+
export function reconcileTraceWithRunLifecycle(
34+
runData: ReconcileRunData,
35+
rootSpanId: string,
36+
events: any[],
37+
totalDuration: number
38+
): ReconcileResult {
39+
const rootEvent = events[0];
40+
const isActualRoot = rootEvent?.id === rootSpanId;
41+
42+
const currentStatus: "executing" | "completed" | "failed" =
43+
isActualRoot && rootEvent
44+
? rootEvent.data.isError
45+
? "failed"
46+
: !rootEvent.data.isPartial
47+
? "completed"
48+
: "executing"
49+
: "executing";
50+
51+
if (!runData.isFinished) {
52+
return { events, totalDuration, rootSpanStatus: currentStatus };
53+
}
54+
55+
const postgresRunDuration = runData.completedAt
56+
? millisecondsToNanoseconds(
57+
runData.completedAt.getTime() -
58+
(runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime()
59+
)
60+
: 0;
61+
62+
const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration);
63+
64+
// We only need to potentially update the root event (the first one) if it matches our ID
65+
if (isActualRoot && rootEvent && rootEvent.data.isPartial) {
66+
const updatedEvents = [...events];
67+
updatedEvents[0] = {
68+
...rootEvent,
69+
data: {
70+
...rootEvent.data,
71+
isPartial: false,
72+
duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration),
73+
isError: isFailedRunStatus(runData.status),
74+
},
75+
};
76+
77+
return {
78+
events: updatedEvents,
79+
totalDuration: updatedTotalDuration,
80+
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
81+
};
82+
}
83+
84+
return {
85+
events,
86+
totalDuration: updatedTotalDuration,
87+
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
88+
};
89+
}

0 commit comments

Comments
 (0)