Skip to content

Commit 9500f2e

Browse files
refactor(webapp): complete modularization of reconciliation logic
1 parent 43897e3 commit 9500f2e

File tree

2 files changed

+27
-72
lines changed

2 files changed

+27
-72
lines changed

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 26 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
88
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
99
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
1010
import { env } from "~/env.server";
11+
import { reconcileTraceWithRunLifecycle } from "./reconcileTrace.server";
1112

1213
type Result = Awaited<ReturnType<RunPresenter["call"]>>;
1314
export type Run = Result["run"];
@@ -124,6 +125,7 @@ export class RunPresenter {
124125
isFinished: isFinalRunStatus(run.status),
125126
startedAt: run.startedAt,
126127
completedAt: run.completedAt,
128+
createdAt: run.createdAt,
127129
logsDeletedAt: showDeletedLogs ? null : run.logsDeletedAt,
128130
rootTaskRun: run.rootTaskRun,
129131
parentTaskRun: run.parentTaskRun,
@@ -214,13 +216,28 @@ export class RunPresenter {
214216
const linkedRunIdBySpanId: Record<string, string> = {};
215217

216218
const events = tree
217-
? flattenTree(tree).map((n) => {
218-
const offset = millisecondsToNanoseconds(
219-
n.data.startTime.getTime() - treeRootStartTimeMs
220-
);
219+
? flattenTree(tree).map((n, index) => {
220+
const isRoot = index === 0;
221+
const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs);
222+
223+
let nIsPartial = n.data.isPartial;
224+
let nDuration = n.data.duration;
225+
let nIsError = n.data.isError;
226+
227+
// NOTE: Clickhouse trace ingestion is eventually consistent.
228+
// When a run is marked finished in Postgres, we reconcile the
229+
// root span to reflect completion even if telemetry is still partial.
230+
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
231+
// run states in the dashboard.
232+
if (isRoot && runData.isFinished && nIsPartial) {
233+
nIsPartial = false;
234+
nDuration = Math.max(nDuration ?? 0, postgresRunDuration);
235+
nIsError = isFailedRunStatus(runData.status);
236+
}
237+
221238
//only let non-debug events extend the total duration
222239
if (!n.data.isDebug) {
223-
totalDuration = Math.max(totalDuration, offset + n.data.duration);
240+
totalDuration = Math.max(totalDuration, offset + (nIsPartial ? 0 : nDuration));
224241
}
225242

226243
// For cached spans, store the mapping from spanId to the linked run's ID
@@ -238,9 +255,11 @@ export class RunPresenter {
238255
treeRootStartTimeMs
239256
),
240257
//set partial nodes to null duration
241-
duration: n.data.isPartial ? null : n.data.duration,
258+
duration: nIsPartial ? null : nDuration,
259+
isPartial: nIsPartial,
260+
isError: nIsError,
242261
offset,
243-
isRoot: n.id === traceSummary.rootSpan.id,
262+
isRoot,
244263
},
245264
};
246265
})
@@ -275,67 +294,3 @@ export class RunPresenter {
275294
}
276295
}
277296

278-
// NOTE: Clickhouse trace ingestion is eventually consistent.
279-
// When a run is marked finished in Postgres, we reconcile the
280-
// root span to reflect completion even if telemetry is still partial.
281-
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
282-
// run states in the dashboard.
283-
export function reconcileTraceWithRunLifecycle(
284-
runData: {
285-
isFinished: boolean;
286-
status: Run["status"];
287-
createdAt: Date;
288-
completedAt: Date | null;
289-
rootTaskRun: { createdAt: Date } | null;
290-
},
291-
rootSpanId: string,
292-
events: RunEvent[],
293-
totalDuration: number
294-
): {
295-
events: RunEvent[];
296-
totalDuration: number;
297-
rootSpanStatus: "executing" | "completed" | "failed";
298-
} {
299-
const rootEvent = events.find((e) => e.id === rootSpanId);
300-
const currentStatus: "executing" | "completed" | "failed" = rootEvent
301-
? rootEvent.data.isError
302-
? "failed"
303-
: !rootEvent.data.isPartial
304-
? "completed"
305-
: "executing"
306-
: "executing";
307-
308-
if (!runData.isFinished) {
309-
return { events, totalDuration, rootSpanStatus: currentStatus };
310-
}
311-
312-
const postgresRunDuration = runData.completedAt
313-
? millisecondsToNanoseconds(
314-
runData.completedAt.getTime() -
315-
(runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime()
316-
)
317-
: 0;
318-
319-
const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration);
320-
321-
const updatedEvents = events.map((e) => {
322-
if (e.id === rootSpanId && e.data.isPartial) {
323-
return {
324-
...e,
325-
data: {
326-
...e.data,
327-
isPartial: false,
328-
duration: Math.max(e.data.duration ?? 0, postgresRunDuration),
329-
isError: isFailedRunStatus(runData.status),
330-
},
331-
};
332-
}
333-
return e;
334-
});
335-
336-
return {
337-
events: updatedEvents,
338-
totalDuration: updatedTotalDuration,
339-
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
340-
};
341-
}

apps/webapp/test/RunPresenter.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ vi.mock("../app/utils/username", () => ({
2424
getUsername: vi.fn(),
2525
}));
2626

27-
import { reconcileTraceWithRunLifecycle } from "../app/presenters/v3/RunPresenter.server";
27+
import { reconcileTraceWithRunLifecycle } from "../app/presenters/v3/reconcileTrace.server";
2828
import { millisecondsToNanoseconds } from "@trigger.dev/core/v3";
2929

3030
describe("reconcileTraceWithRunLifecycle", () => {

0 commit comments

Comments
 (0)