Skip to content

Commit 8b14041

Browse files
fix(webapp): optimize reconciliation to O(1) and add trailing-edge throttle
1 parent 5eeb55d commit 8b14041

File tree

2 files changed

+89
-48
lines changed

2 files changed

+89
-48
lines changed

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 69 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -208,19 +208,43 @@ export class RunPresenter {
208208

209209
//we need the start offset for each item, and the total duration of the entire tree
210210
const treeRootStartTimeMs = tree ? tree?.data.startTime.getTime() : 0;
211+
212+
const postgresRunDuration =
213+
runData.isFinished && run.completedAt
214+
? millisecondsToNanoseconds(
215+
run.completedAt.getTime() -
216+
(run.rootTaskRun?.createdAt ?? run.createdAt).getTime()
217+
)
218+
: 0;
219+
211220
let totalDuration = tree?.data.duration ?? 0;
212221

213222
// Build the linkedRunIdBySpanId map during the same walk
214223
const linkedRunIdBySpanId: Record<string, string> = {};
215224

216225
const events = tree
217-
? flattenTree(tree).map((n) => {
218-
const offset = millisecondsToNanoseconds(
219-
n.data.startTime.getTime() - treeRootStartTimeMs
220-
);
226+
? flattenTree(tree).map((n, index) => {
227+
const isRoot = index === 0;
228+
const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs);
229+
230+
let nIsPartial = n.data.isPartial;
231+
let nDuration = n.data.duration;
232+
let nIsError = n.data.isError;
233+
234+
// NOTE: Clickhouse trace ingestion is eventually consistent.
235+
// When a run is marked finished in Postgres, we reconcile the
236+
// root span to reflect completion even if telemetry is still partial.
237+
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
238+
// run states in the dashboard.
239+
if (isRoot && runData.isFinished && nIsPartial) {
240+
nIsPartial = false;
241+
nDuration = Math.max(nDuration ?? 0, postgresRunDuration);
242+
nIsError = isFailedRunStatus(runData.status);
243+
}
244+
221245
//only let non-debug events extend the total duration
222246
if (!n.data.isDebug) {
223-
totalDuration = Math.max(totalDuration, offset + n.data.duration);
247+
totalDuration = Math.max(totalDuration, offset + (nIsPartial ? 0 : nDuration));
224248
}
225249

226250
// For cached spans, store the mapping from spanId to the linked run's ID
@@ -238,23 +262,24 @@ export class RunPresenter {
238262
treeRootStartTimeMs
239263
),
240264
//set partial nodes to null duration
241-
duration: n.data.isPartial ? null : n.data.duration,
265+
duration: nIsPartial ? null : nDuration,
266+
isPartial: nIsPartial,
267+
isError: nIsError,
242268
offset,
243-
isRoot: n.id === traceSummary.rootSpan.id,
269+
isRoot,
244270
},
245271
};
246272
})
247273
: [];
248274

275+
if (runData.isFinished) {
276+
totalDuration = Math.max(totalDuration, postgresRunDuration);
277+
}
278+
249279
//total duration should be a minimum of 1ms
250280
totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1));
251281

252-
const reconciled = reconcileTraceWithRunLifecycle(
253-
runData,
254-
traceSummary.rootSpan.id,
255-
events,
256-
totalDuration
257-
);
282+
const reconciled = reconcileTraceWithRunLifecycle(runData, traceSummary.rootSpan.id, events, totalDuration);
258283

259284
return {
260285
run: runData,
@@ -296,14 +321,17 @@ export function reconcileTraceWithRunLifecycle(
296321
totalDuration: number;
297322
rootSpanStatus: "executing" | "completed" | "failed";
298323
} {
299-
const rootEvent = events.find((e) => e.id === rootSpanId);
300-
const currentStatus: "executing" | "completed" | "failed" = rootEvent
301-
? rootEvent.data.isError
302-
? "failed"
303-
: !rootEvent.data.isPartial
304-
? "completed"
305-
: "executing"
306-
: "executing";
324+
const rootEvent = events[0];
325+
const isActualRoot = rootEvent?.id === rootSpanId;
326+
327+
const currentStatus: "executing" | "completed" | "failed" =
328+
isActualRoot && rootEvent
329+
? rootEvent.data.isError
330+
? "failed"
331+
: !rootEvent.data.isPartial
332+
? "completed"
333+
: "executing"
334+
: "executing";
307335

308336
if (!runData.isFinished) {
309337
return { events, totalDuration, rootSpanStatus: currentStatus };
@@ -318,23 +346,28 @@ export function reconcileTraceWithRunLifecycle(
318346

319347
const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration);
320348

321-
const updatedEvents = events.map((e) => {
322-
if (e.id === rootSpanId && e.data.isPartial) {
323-
return {
324-
...e,
325-
data: {
326-
...e.data,
327-
isPartial: false,
328-
duration: Math.max(e.data.duration ?? 0, postgresRunDuration),
329-
isError: isFailedRunStatus(runData.status),
330-
},
331-
};
332-
}
333-
return e;
334-
});
349+
// We only need to potentially update the root event (the first one) if it matches our ID
350+
if (isActualRoot && rootEvent && rootEvent.data.isPartial) {
351+
const updatedEvents = [...events];
352+
updatedEvents[0] = {
353+
...rootEvent,
354+
data: {
355+
...rootEvent.data,
356+
isPartial: false,
357+
duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration),
358+
isError: isFailedRunStatus(runData.status),
359+
},
360+
};
361+
362+
return {
363+
events: updatedEvents,
364+
totalDuration: updatedTotalDuration,
365+
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
366+
};
367+
}
335368

336369
return {
337-
events: updatedEvents,
370+
events,
338371
totalDuration: updatedTotalDuration,
339372
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
340373
};

apps/webapp/app/utils/throttle.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
1-
//From: https://kettanaito.com/blog/debounce-vs-throttle
2-
3-
/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */
1+
/** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */
42
export function throttle(
53
func: (...args: any[]) => void,
64
durationMs: number
75
): (...args: any[]) => void {
8-
let isPrimedToFire = false;
9-
10-
return (...args: any[]) => {
11-
if (!isPrimedToFire) {
12-
isPrimedToFire = true;
6+
let timeoutId: NodeJS.Timeout | null = null;
7+
let nextArgs: any[] | null = null;
138

14-
setTimeout(() => {
15-
func(...args);
16-
isPrimedToFire = false;
17-
}, durationMs);
9+
const wrapped = (...args: any[]) => {
10+
if (timeoutId) {
11+
nextArgs = args;
12+
return;
1813
}
14+
15+
func(...args);
16+
17+
timeoutId = setTimeout(() => {
18+
timeoutId = null;
19+
if (nextArgs) {
20+
const argsToUse = nextArgs;
21+
nextArgs = null;
22+
wrapped(...argsToUse);
23+
}
24+
}, durationMs);
1925
};
26+
27+
return wrapped;
2028
}

0 commit comments

Comments
 (0)