Skip to content

Commit 0fd10c9

Browse files
committed
add execution heartbeat service
1 parent 81b8c4b commit 0fd10c9

File tree

2 files changed

+232
-96
lines changed

2 files changed

+232
-96
lines changed

packages/cli-v3/src/entryPoints/managed-run-controller.ts

Lines changed: 49 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { io, type Socket } from "socket.io-client";
2525
import { RunnerEnv } from "./managed/env.js";
2626
import { MetadataClient } from "./managed/overrides.js";
2727
import { RunLogger, SendDebugLogOptions } from "./managed/logger.js";
28+
import { RunExecutionHeartbeat } from "./managed/heartbeat.js";
2829

2930
const env = new RunnerEnv(stdEnv);
3031

@@ -54,45 +55,7 @@ class ManagedRunController {
5455
private readonly logger: RunLogger;
5556

5657
private readonly runHeartbeat: HeartbeatService;
57-
private readonly snapshotPoller: HeartbeatService;
58-
59-
get heartbeatIntervalSeconds() {
60-
return env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS;
61-
}
62-
63-
get snapshotPollIntervalSeconds() {
64-
return env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS;
65-
}
66-
67-
get runnerId() {
68-
return env.TRIGGER_RUNNER_ID;
69-
}
70-
71-
get successExitCode() {
72-
return env.TRIGGER_SUCCESS_EXIT_CODE;
73-
}
74-
75-
get failureExitCode() {
76-
return env.TRIGGER_FAILURE_EXIT_CODE;
77-
}
78-
79-
get workerApiUrl() {
80-
return env.TRIGGER_SUPERVISOR_API_URL;
81-
}
82-
83-
get workerInstanceName() {
84-
return env.TRIGGER_WORKER_INSTANCE_NAME;
85-
}
86-
87-
private state:
88-
| {
89-
phase: "RUN";
90-
run: Run;
91-
snapshot: Snapshot;
92-
}
93-
| {
94-
phase: "IDLE" | "WARM_START";
95-
} = { phase: "IDLE" };
58+
private readonly snapshotPoller: RunExecutionHeartbeat;
9659

9760
constructor(opts: ManagedRunControllerOptions) {
9861
this.workerManifest = opts.workerManifest;
@@ -137,63 +100,14 @@ class ManagedRunController {
137100
});
138101
}
139102

140-
this.snapshotPoller = new HeartbeatService({
141-
heartbeat: async () => {
142-
if (!this.runFriendlyId) {
143-
this.sendDebugLog({
144-
runId: env.TRIGGER_RUN_ID,
145-
message: "Skipping snapshot poll, no run ID",
146-
});
147-
return;
148-
}
149-
150-
this.sendDebugLog({
151-
runId: env.TRIGGER_RUN_ID,
152-
message: "Polling for latest snapshot",
153-
});
154-
155-
this.sendDebugLog({
156-
runId: this.runFriendlyId,
157-
message: `snapshot poll: started`,
158-
properties: {
159-
snapshotId: this.snapshotFriendlyId,
160-
},
161-
});
162-
163-
const response = await this.httpClient.getRunExecutionData(this.runFriendlyId);
164-
165-
if (!response.success) {
166-
this.sendDebugLog({
167-
runId: this.runFriendlyId,
168-
message: "Snapshot poll failed",
169-
properties: {
170-
error: response.error,
171-
},
172-
});
173-
174-
this.sendDebugLog({
175-
runId: this.runFriendlyId,
176-
message: `snapshot poll: failed`,
177-
properties: {
178-
snapshotId: this.snapshotFriendlyId,
179-
error: response.error,
180-
},
181-
});
182-
183-
return;
184-
}
185-
186-
await this.handleSnapshotChange(response.data.execution);
187-
},
188-
intervalMs: this.snapshotPollIntervalSeconds * 1000,
189-
leadingEdge: false,
190-
onError: async (error) => {
191-
this.sendDebugLog({
192-
runId: this.runFriendlyId,
193-
message: "Failed to poll for snapshot",
194-
properties: { error: error instanceof Error ? error.message : String(error) },
195-
});
196-
},
103+
this.snapshotPoller = new RunExecutionHeartbeat({
104+
// @ts-expect-error
105+
runFriendlyId: env.TRIGGER_RUN_ID,
106+
// @ts-expect-error
107+
snapshotFriendlyId: env.TRIGGER_SNAPSHOT_ID,
108+
httpClient: this.httpClient,
109+
logger: this.logger,
110+
heartbeatIntervalSeconds: this.heartbeatIntervalSeconds,
197111
});
198112

199113
this.runHeartbeat = new HeartbeatService({
@@ -246,6 +160,44 @@ class ManagedRunController {
246160
});
247161
}
248162

163+
get heartbeatIntervalSeconds() {
164+
return env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS;
165+
}
166+
167+
get snapshotPollIntervalSeconds() {
168+
return env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS;
169+
}
170+
171+
get runnerId() {
172+
return env.TRIGGER_RUNNER_ID;
173+
}
174+
175+
get successExitCode() {
176+
return env.TRIGGER_SUCCESS_EXIT_CODE;
177+
}
178+
179+
get failureExitCode() {
180+
return env.TRIGGER_FAILURE_EXIT_CODE;
181+
}
182+
183+
get workerApiUrl() {
184+
return env.TRIGGER_SUPERVISOR_API_URL;
185+
}
186+
187+
get workerInstanceName() {
188+
return env.TRIGGER_WORKER_INSTANCE_NAME;
189+
}
190+
191+
private state:
192+
| {
193+
phase: "RUN";
194+
run: Run;
195+
snapshot: Snapshot;
196+
}
197+
| {
198+
phase: "IDLE" | "WARM_START";
199+
} = { phase: "IDLE" };
200+
249201
private enterRunPhase(run: Run, snapshot: Snapshot) {
250202
this.onExitRunPhase(run);
251203
this.state = { phase: "RUN", run, snapshot };
@@ -477,6 +429,7 @@ class ManagedRunController {
477429

478430
try {
479431
this.updateRunPhase(run, snapshot);
432+
this.snapshotPoller.updateSnapshotId(snapshot.friendlyId);
480433
} catch (error) {
481434
this.sendDebugLog({
482435
runId: run.friendlyId,
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
import { HeartbeatService, RunExecutionData } from "@trigger.dev/core/v3";
2+
import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker";
3+
import { RunLogger } from "./logger.js";
4+
5+
export type RunExecutionHeartbeatOptions = {
6+
runFriendlyId: string;
7+
snapshotFriendlyId: string;
8+
httpClient: WorkloadHttpClient;
9+
logger: RunLogger;
10+
heartbeatIntervalSeconds: number;
11+
};
12+
13+
export class RunExecutionHeartbeat {
14+
private readonly logger: RunLogger;
15+
private readonly heartbeat: HeartbeatService;
16+
private readonly httpClient: WorkloadHttpClient;
17+
18+
private readonly runFriendlyId: string;
19+
private snapshotFriendlyId: string;
20+
21+
constructor(opts: RunExecutionHeartbeatOptions) {
22+
this.logger = opts.logger;
23+
this.httpClient = opts.httpClient;
24+
25+
this.runFriendlyId = opts.runFriendlyId;
26+
this.snapshotFriendlyId = opts.snapshotFriendlyId;
27+
28+
this.heartbeat = new HeartbeatService({
29+
heartbeat: async () => {
30+
this.logger.sendDebugLog({
31+
runId: this.runFriendlyId,
32+
message: "heartbeat: started",
33+
});
34+
35+
const response = await this.httpClient.heartbeatRun(
36+
this.runFriendlyId,
37+
this.snapshotFriendlyId
38+
);
39+
40+
if (!response.success) {
41+
this.logger.sendDebugLog({
42+
runId: this.runFriendlyId,
43+
message: "heartbeat: failed",
44+
properties: {
45+
error: response.error,
46+
},
47+
});
48+
}
49+
},
50+
intervalMs: opts.heartbeatIntervalSeconds * 1000,
51+
leadingEdge: false,
52+
onError: async (error) => {
53+
this.logger.sendDebugLog({
54+
runId: this.runFriendlyId,
55+
message: "Failed to send heartbeat",
56+
properties: { error: error instanceof Error ? error.message : String(error) },
57+
});
58+
},
59+
});
60+
}
61+
62+
resetCurrentInterval() {
63+
this.heartbeat.resetCurrentInterval();
64+
}
65+
66+
updateSnapshotId(snapshotId: string) {
67+
this.snapshotFriendlyId = snapshotId;
68+
}
69+
70+
updateInterval(intervalMs: number) {
71+
this.heartbeat.updateInterval(intervalMs);
72+
}
73+
74+
start() {
75+
this.heartbeat.start();
76+
}
77+
78+
stop() {
79+
this.heartbeat.stop();
80+
}
81+
}
82+
83+
type RunExecutionSnapshotPollerOptions = {
84+
runFriendlyId: string;
85+
snapshotFriendlyId: string;
86+
httpClient: WorkloadHttpClient;
87+
logger: RunLogger;
88+
snapshotPollIntervalSeconds: number;
89+
handleSnapshotChange: (execution: RunExecutionData) => Promise<void>;
90+
};
91+
92+
class RunExecutionSnapshotPoller {
93+
private readonly logger: RunLogger;
94+
private readonly poller: HeartbeatService;
95+
private readonly httpClient: WorkloadHttpClient;
96+
97+
private readonly runFriendlyId: string;
98+
private readonly snapshotFriendlyId: string;
99+
private readonly snapshotPollIntervalSeconds: number;
100+
101+
private readonly handleSnapshotChange: (execution: RunExecutionData) => Promise<void>;
102+
103+
constructor(opts: RunExecutionSnapshotPollerOptions) {
104+
this.logger = opts.logger;
105+
this.httpClient = opts.httpClient;
106+
107+
this.runFriendlyId = opts.runFriendlyId;
108+
this.snapshotFriendlyId = opts.snapshotFriendlyId;
109+
110+
this.handleSnapshotChange = opts.handleSnapshotChange;
111+
112+
this.poller = new HeartbeatService({
113+
heartbeat: async () => {
114+
if (!this.runFriendlyId) {
115+
this.logger.sendDebugLog({
116+
runId: this.runFriendlyId,
117+
message: "Skipping snapshot poll, no run ID",
118+
});
119+
return;
120+
}
121+
122+
this.logger.sendDebugLog({
123+
runId: this.runFriendlyId,
124+
message: "Polling for latest snapshot",
125+
});
126+
127+
this.logger.sendDebugLog({
128+
runId: this.runFriendlyId,
129+
message: `snapshot poll: started`,
130+
properties: {
131+
snapshotId: this.snapshotFriendlyId,
132+
},
133+
});
134+
135+
const response = await this.httpClient.getRunExecutionData(this.runFriendlyId);
136+
137+
if (!response.success) {
138+
this.logger.sendDebugLog({
139+
runId: this.runFriendlyId,
140+
message: "Snapshot poll failed",
141+
properties: {
142+
error: response.error,
143+
},
144+
});
145+
146+
this.logger.sendDebugLog({
147+
runId: this.runFriendlyId,
148+
message: `snapshot poll: failed`,
149+
properties: {
150+
snapshotId: this.snapshotFriendlyId,
151+
error: response.error,
152+
},
153+
});
154+
155+
return;
156+
}
157+
158+
await this.handleSnapshotChange(response.data.execution);
159+
},
160+
intervalMs: this.snapshotPollIntervalSeconds * 1000,
161+
leadingEdge: false,
162+
onError: async (error) => {
163+
this.logger.sendDebugLog({
164+
runId: this.runFriendlyId,
165+
message: "Failed to poll for snapshot",
166+
properties: { error: error instanceof Error ? error.message : String(error) },
167+
});
168+
},
169+
});
170+
}
171+
172+
updateInterval(intervalMs: number) {
173+
this.poller.updateInterval(intervalMs);
174+
}
175+
176+
start() {
177+
this.poller.start();
178+
}
179+
180+
stop() {
181+
this.poller.stop();
182+
}
183+
}

0 commit comments

Comments
 (0)