Skip to content

Commit 990ac46

Browse files
committed
add snapshot poller service
1 parent 0fd10c9 commit 990ac46

File tree

2 files changed

+130
-44
lines changed

2 files changed

+130
-44
lines changed

packages/cli-v3/src/entryPoints/managed-run-controller.ts

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { env as stdEnv } from "std-env";
33
import { readJSONFile } from "../utilities/fileSystem.js";
44
import {
55
type CompleteRunAttemptResult,
6-
HeartbeatService,
76
type RunExecutionData,
87
SuspendedProcessError,
98
type TaskRunExecutionMetrics,
@@ -26,6 +25,7 @@ import { RunnerEnv } from "./managed/env.js";
2625
import { MetadataClient } from "./managed/overrides.js";
2726
import { RunLogger, SendDebugLogOptions } from "./managed/logger.js";
2827
import { RunExecutionHeartbeat } from "./managed/heartbeat.js";
28+
import { RunExecutionSnapshotPoller } from "./managed/poller.js";
2929

3030
const env = new RunnerEnv(stdEnv);
3131

@@ -54,8 +54,8 @@ class ManagedRunController {
5454
private socket: Socket<WorkloadServerToClientEvents, WorkloadClientToServerEvents>;
5555
private readonly logger: RunLogger;
5656

57-
private readonly runHeartbeat: HeartbeatService;
58-
private readonly snapshotPoller: RunExecutionHeartbeat;
57+
private readonly runHeartbeat: RunExecutionHeartbeat;
58+
private readonly snapshotPoller: RunExecutionSnapshotPoller;
5959

6060
constructor(opts: ManagedRunControllerOptions) {
6161
this.workerManifest = opts.workerManifest;
@@ -100,55 +100,25 @@ class ManagedRunController {
100100
});
101101
}
102102

103-
this.snapshotPoller = new RunExecutionHeartbeat({
103+
this.snapshotPoller = new RunExecutionSnapshotPoller({
104104
// @ts-expect-error
105105
runFriendlyId: env.TRIGGER_RUN_ID,
106106
// @ts-expect-error
107107
snapshotFriendlyId: env.TRIGGER_SNAPSHOT_ID,
108108
httpClient: this.httpClient,
109109
logger: this.logger,
110-
heartbeatIntervalSeconds: this.heartbeatIntervalSeconds,
110+
snapshotPollIntervalSeconds: this.snapshotPollIntervalSeconds,
111+
handleSnapshotChange: this.handleSnapshotChange,
111112
});
112113

113-
this.runHeartbeat = new HeartbeatService({
114-
heartbeat: async () => {
115-
if (!this.runFriendlyId || !this.snapshotFriendlyId) {
116-
this.sendDebugLog({
117-
runId: this.runFriendlyId,
118-
message: "Skipping heartbeat, no run ID or snapshot ID",
119-
});
120-
return;
121-
}
122-
123-
this.sendDebugLog({
124-
runId: this.runFriendlyId,
125-
message: "heartbeat: started",
126-
});
127-
128-
const response = await this.httpClient.heartbeatRun(
129-
this.runFriendlyId,
130-
this.snapshotFriendlyId
131-
);
132-
133-
if (!response.success) {
134-
this.sendDebugLog({
135-
runId: this.runFriendlyId,
136-
message: "heartbeat: failed",
137-
properties: {
138-
error: response.error,
139-
},
140-
});
141-
}
142-
},
143-
intervalMs: this.heartbeatIntervalSeconds * 1000,
144-
leadingEdge: false,
145-
onError: async (error) => {
146-
this.sendDebugLog({
147-
runId: this.runFriendlyId,
148-
message: "Failed to send heartbeat",
149-
properties: { error: error instanceof Error ? error.message : String(error) },
150-
});
151-
},
114+
this.runHeartbeat = new RunExecutionHeartbeat({
115+
// @ts-expect-error
116+
runFriendlyId: env.TRIGGER_RUN_ID,
117+
// @ts-expect-error
118+
snapshotFriendlyId: env.TRIGGER_SNAPSHOT_ID,
119+
httpClient: this.httpClient,
120+
logger: this.logger,
121+
heartbeatIntervalSeconds: this.heartbeatIntervalSeconds,
152122
});
153123

154124
process.on("SIGTERM", async () => {
@@ -429,6 +399,8 @@ class ManagedRunController {
429399

430400
try {
431401
this.updateRunPhase(run, snapshot);
402+
403+
this.runHeartbeat.updateSnapshotId(snapshot.friendlyId);
432404
this.snapshotPoller.updateSnapshotId(snapshot.friendlyId);
433405
} catch (error) {
434406
this.sendDebugLog({
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker";
2+
import { RunLogger } from "./logger.js";
3+
import { HeartbeatService, RunExecutionData } from "@trigger.dev/core/v3";
4+
5+
export type RunExecutionSnapshotPollerOptions = {
6+
runFriendlyId: string;
7+
snapshotFriendlyId: string;
8+
httpClient: WorkloadHttpClient;
9+
logger: RunLogger;
10+
snapshotPollIntervalSeconds: number;
11+
handleSnapshotChange: (execution: RunExecutionData) => Promise<void>;
12+
};
13+
14+
export class RunExecutionSnapshotPoller {
15+
private readonly logger: RunLogger;
16+
private readonly poller: HeartbeatService;
17+
private readonly httpClient: WorkloadHttpClient;
18+
19+
private readonly runFriendlyId: string;
20+
private snapshotFriendlyId: string;
21+
22+
private readonly snapshotPollIntervalSeconds: number;
23+
24+
private readonly handleSnapshotChange: (execution: RunExecutionData) => Promise<void>;
25+
26+
constructor(opts: RunExecutionSnapshotPollerOptions) {
27+
this.logger = opts.logger;
28+
this.httpClient = opts.httpClient;
29+
30+
this.runFriendlyId = opts.runFriendlyId;
31+
this.snapshotFriendlyId = opts.snapshotFriendlyId;
32+
33+
this.handleSnapshotChange = opts.handleSnapshotChange;
34+
35+
this.poller = new HeartbeatService({
36+
heartbeat: async () => {
37+
if (!this.runFriendlyId) {
38+
this.logger.sendDebugLog({
39+
runId: this.runFriendlyId,
40+
message: "Skipping snapshot poll, no run ID",
41+
});
42+
return;
43+
}
44+
45+
this.logger.sendDebugLog({
46+
runId: this.runFriendlyId,
47+
message: "Polling for latest snapshot",
48+
});
49+
50+
this.logger.sendDebugLog({
51+
runId: this.runFriendlyId,
52+
message: `snapshot poll: started`,
53+
properties: {
54+
snapshotId: this.snapshotFriendlyId,
55+
},
56+
});
57+
58+
const response = await this.httpClient.getRunExecutionData(this.runFriendlyId);
59+
60+
if (!response.success) {
61+
this.logger.sendDebugLog({
62+
runId: this.runFriendlyId,
63+
message: "Snapshot poll failed",
64+
properties: {
65+
error: response.error,
66+
},
67+
});
68+
69+
this.logger.sendDebugLog({
70+
runId: this.runFriendlyId,
71+
message: `snapshot poll: failed`,
72+
properties: {
73+
snapshotId: this.snapshotFriendlyId,
74+
error: response.error,
75+
},
76+
});
77+
78+
return;
79+
}
80+
81+
await this.handleSnapshotChange(response.data.execution);
82+
},
83+
intervalMs: this.snapshotPollIntervalSeconds * 1000,
84+
leadingEdge: false,
85+
onError: async (error) => {
86+
this.logger.sendDebugLog({
87+
runId: this.runFriendlyId,
88+
message: "Failed to poll for snapshot",
89+
properties: { error: error instanceof Error ? error.message : String(error) },
90+
});
91+
},
92+
});
93+
}
94+
95+
resetCurrentInterval() {
96+
this.poller.resetCurrentInterval();
97+
}
98+
99+
updateSnapshotId(snapshotId: string) {
100+
this.snapshotFriendlyId = snapshotId;
101+
}
102+
103+
updateInterval(intervalMs: number) {
104+
this.poller.updateInterval(intervalMs);
105+
}
106+
107+
start() {
108+
this.poller.start();
109+
}
110+
111+
stop() {
112+
this.poller.stop();
113+
}
114+
}

0 commit comments

Comments
 (0)