Skip to content

Commit 06fa3ff

Browse files
committed
Live reload the run until the run has been finished for 30s
- Send refresh pings every 5s - Throttle so we never update more than once per second - Stop auto-reloading when the run has been completed for >=30s
1 parent 1ac86b6 commit 06fa3ff

File tree

2 files changed

+54
-30
lines changed

2 files changed

+54
-30
lines changed

apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { PrismaClient, prisma } from "~/db.server";
1+
import { type PrismaClient, prisma } from "~/db.server";
22
import { logger } from "~/services/logger.server";
33
import { singleton } from "~/utils/singleton";
4-
import { createSSELoader } from "~/utils/sse";
4+
import { createSSELoader, SendFunction } from "~/utils/sse";
55
import { throttle } from "~/utils/throttle";
66
import { tracePubSub } from "~/v3/services/tracePubSub.server";
77

8-
const PING_INTERVAL = 1000;
9-
const STREAM_TIMEOUT = 30 * 1000; // 30 seconds
8+
const PING_INTERVAL = 5_000;
9+
const STREAM_TIMEOUT = 30_000;
1010

1111
export class RunStreamPresenter {
1212
#prismaClient: PrismaClient;
@@ -49,36 +49,40 @@ export class RunStreamPresenter {
4949
// Subscribe to trace updates
5050
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
5151

52-
// Store throttled send function and message listener for cleanup
53-
let throttledSend: ReturnType<typeof throttle> | undefined;
52+
// Only send max every 1 second
53+
const throttledSend = throttle(
54+
(args: { send: SendFunction; event?: string; data: string }) => {
55+
try {
56+
args.send(args);
57+
} catch (error) {
58+
if (error instanceof Error) {
59+
if (error.name !== "TypeError") {
60+
logger.debug("Error sending SSE in RunStreamPresenter", {
61+
error: {
62+
name: error.name,
63+
message: error.message,
64+
stack: error.stack,
65+
},
66+
});
67+
}
68+
}
69+
// Abort the stream on send error
70+
context.controller.abort("Send error");
71+
}
72+
},
73+
1000
74+
);
75+
5476
let messageListener: ((event: string) => void) | undefined;
5577

5678
return {
5779
initStream: ({ send }) => {
5880
// Create throttled send function
59-
throttledSend = throttle((args: { event?: string; data: string }) => {
60-
try {
61-
send(args);
62-
} catch (error) {
63-
if (error instanceof Error) {
64-
if (error.name !== "TypeError") {
65-
logger.debug("Error sending SSE in RunStreamPresenter", {
66-
error: {
67-
name: error.name,
68-
message: error.message,
69-
stack: error.stack,
70-
},
71-
});
72-
}
73-
}
74-
// Abort the stream on send error
75-
context.controller.abort("Send error");
76-
}
77-
}, 1000);
81+
throttledSend({ send, event: "message", data: new Date().toISOString() });
7882

7983
// Set up message listener for pub/sub events
8084
messageListener = (event: string) => {
81-
throttledSend?.({ data: event });
85+
throttledSend({ send, event: "message", data: event });
8286
};
8387
eventEmitter.addListener("message", messageListener);
8488

@@ -88,7 +92,8 @@ export class RunStreamPresenter {
8892
iterator: ({ send }) => {
8993
// Send ping to keep connection alive
9094
try {
91-
send({ event: "ping", data: new Date().toISOString() });
95+
// Send an actual message so the client refreshes
96+
throttledSend({ send, event: "message", data: new Date().toISOString() });
9297
} catch (error) {
9398
// If we can't send a ping, the connection is likely dead
9499
return false;

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,24 @@ export default function Page() {
436436
);
437437
}
438438

439+
function shouldLiveReload({
440+
events,
441+
maximumLiveReloadingSetting,
442+
run,
443+
}: {
444+
events: TraceEvent[];
445+
maximumLiveReloadingSetting: number;
446+
run: { completedAt: string | null };
447+
}): boolean {
448+
// We don't live reload if there are a ton of spans/logs
449+
if (events.length > maximumLiveReloadingSetting) return false;
450+
451+
// If the run was completed a while ago, we don't need to live reload anymore
452+
if (run.completedAt && new Date(run.completedAt).getTime() < Date.now() - 30_000) return false;
453+
454+
return true;
455+
}
456+
439457
function TraceView({
440458
run,
441459
trace,
@@ -453,18 +471,19 @@ function TraceView({
453471

454472
const { events, duration, rootSpanStatus, rootStartedAt, queuedDuration, overridesBySpanId } =
455473
trace;
456-
const shouldLiveReload = events.length <= maximumLiveReloadingSetting;
457474

458475
const changeToSpan = useDebounce((selectedSpan: string) => {
459476
replaceSearchParam("span", selectedSpan, { replace: true });
460477
}, 250);
461478

479+
const isLiveReloading = shouldLiveReload({ events, maximumLiveReloadingSetting, run });
480+
462481
const revalidator = useRevalidator();
463482
const streamedEvents = useEventSource(
464483
v3RunStreamingPath(organization, project, environment, run),
465484
{
466485
event: "message",
467-
disabled: !shouldLiveReload,
486+
disabled: !isLiveReloading,
468487
}
469488
);
470489
useEffect(() => {
@@ -511,7 +530,7 @@ function TraceView({
511530
rootStartedAt={rootStartedAt ? new Date(rootStartedAt) : undefined}
512531
queuedDuration={queuedDuration}
513532
environmentType={run.environment.type}
514-
shouldLiveReload={shouldLiveReload}
533+
shouldLiveReload={isLiveReloading}
515534
maximumLiveReloadingSetting={maximumLiveReloadingSetting}
516535
rootRun={run.rootTaskRun}
517536
parentRun={run.parentTaskRun}

0 commit comments

Comments
 (0)