Skip to content

Commit d5e7158

Browse files
committed
WIP
1 parent b57993e commit d5e7158

File tree

11 files changed

+306
-49
lines changed

11 files changed

+306
-49
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,7 @@ const EnvironmentSchema = z
219219
.string()
220220
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
221221
REALTIME_STREAMS_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
222-
REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce
223-
.number()
224-
.int()
225-
.default(60000 * 5), // 5 minutes
222+
REALTIME_STREAMS_INACTIVITY_TIMEOUT_MS: z.coerce.number().int().default(60000), // 1 minute
226223

227224
REALTIME_MAXIMUM_CREATED_AT_FILTER_AGE_IN_MS: z.coerce
228225
.number()
@@ -1235,6 +1232,7 @@ const EnvironmentSchema = z
12351232
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
12361233
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
12371234
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
1235+
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(60_000),
12381236
})
12391237
.and(GithubAppEnvSchema)
12401238
.and(S2EnvSchema);

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,19 +149,19 @@ traceContext.setGlobalManager(standardTraceContextManager);
149149

150150
const durableClock = new DurableClock();
151151
clock.setGlobalClock(durableClock);
152-
const runMetadataManager = new StandardMetadataManager(
153-
apiClientManager.clientOrThrow(),
154-
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev"
155-
);
152+
const runMetadataManager = new StandardMetadataManager(apiClientManager.clientOrThrow());
156153
runMetadata.setGlobalManager(runMetadataManager);
157154

158155
const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager(
159156
apiClientManager.clientOrThrow(),
160-
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev"
157+
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
158+
(getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
159+
false
161160
);
162161
realtimeStreams.setGlobalManager(standardRealtimeStreamsManager);
163162

164-
const waitUntilManager = new StandardWaitUntilManager();
163+
const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000);
164+
const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs);
165165
waitUntil.setGlobalManager(waitUntilManager);
166166

167167
const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL");
@@ -531,10 +531,6 @@ const zodIpc = new ZodIpcConnection({
531531

532532
runMetadataManager.runId = execution.run.id;
533533
runMetadataManager.runIdIsRoot = typeof execution.run.rootTaskRunId === "undefined";
534-
runMetadataManager.streamsVersion =
535-
typeof execution.run.realtimeStreamsVersion === "undefined"
536-
? "v1"
537-
: execution.run.realtimeStreamsVersion;
538534

539535
_executionCount++;
540536

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
WorkerToExecutorMessageCatalog,
3232
traceContext,
3333
heartbeats,
34+
realtimeStreams,
3435
} from "@trigger.dev/core/v3";
3536
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
3637
import {
@@ -57,6 +58,7 @@ import {
5758
UsageTimeoutManager,
5859
StandardTraceContextManager,
5960
StandardHeartbeatsManager,
61+
StandardRealtimeStreamsManager,
6062
} from "@trigger.dev/core/v3/workers";
6163
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
6264
import { readFile } from "node:fs/promises";
@@ -127,11 +129,16 @@ clock.setGlobalClock(durableClock);
127129
const standardTraceContextManager = new StandardTraceContextManager();
128130
traceContext.setGlobalManager(standardTraceContextManager);
129131

130-
const runMetadataManager = new StandardMetadataManager(
132+
const runMetadataManager = new StandardMetadataManager(apiClientManager.clientOrThrow());
133+
runMetadata.setGlobalManager(runMetadataManager);
134+
135+
const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager(
131136
apiClientManager.clientOrThrow(),
132-
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev"
137+
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev",
138+
(getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ??
139+
false
133140
);
134-
runMetadata.setGlobalManager(runMetadataManager);
141+
realtimeStreams.setGlobalManager(standardRealtimeStreamsManager);
135142

136143
const waitUntilManager = new StandardWaitUntilManager();
137144
waitUntil.setGlobalManager(waitUntilManager);
@@ -292,6 +299,7 @@ function resetExecutionEnvironment() {
292299
timeout.reset();
293300
runMetadataManager.reset();
294301
waitUntilManager.reset();
302+
standardRealtimeStreamsManager.reset();
295303
_sharedWorkerRuntime?.reset();
296304
durableClock.reset();
297305
taskContext.disable();
@@ -300,8 +308,8 @@ function resetExecutionEnvironment() {
300308

301309
// Wait for all streams to finish before completing the run
302310
waitUntil.register({
303-
requiresResolving: () => runMetadataManager.hasActiveStreams(),
304-
promise: () => runMetadataManager.waitForAllStreams(),
311+
requiresResolving: () => standardRealtimeStreamsManager.hasActiveStreams(),
312+
promise: () => standardRealtimeStreamsManager.waitForAllStreams(),
305313
});
306314

307315
console.log(`[${new Date().toISOString()}] Reset execution environment`);

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import { S2MetadataStream } from "../runMetadata/s2MetadataStream.js";
1515
export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
1616
constructor(
1717
private apiClient: ApiClient,
18-
private baseUrl: string
18+
private baseUrl: string,
19+
private debug: boolean = false
1920
) {}
2021
// Add a Map to track active streams
2122
private activeStreams = new Map<string, { wait: () => Promise<void> }>();
@@ -65,6 +66,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
6566
source,
6667
signal: options?.signal,
6768
limiter: (await import("p-limit")).default,
69+
debug: this.debug,
6870
});
6971

7072
this.activeStreams.set(key, streamInstance);

packages/core/src/v3/runMetadata/s2MetadataStream.ts

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export type S2MetadataStreamOptions<T = any> = {
2020
signal?: AbortSignal;
2121
flushIntervalMs?: number; // How often to flush batched chunks (default 200ms)
2222
maxRetries?: number; // Max number of retries for failed flushes (default 10)
23+
debug?: boolean; // Enable debug logging (default false)
2324
};
2425

2526
/**
@@ -30,6 +31,7 @@ export type S2MetadataStreamOptions<T = any> = {
3031
* - Periodic flushing: Flushes buffered chunks every ~200ms (configurable)
3132
* - Sequential writes: Uses p-limit to ensure writes happen in order
3233
* - Automatic retries: Retries failed writes with exponential backoff
34+
* - Debug logging: Enable with debug: true to see detailed operation logs
3335
*
3436
* Example usage:
3537
* ```typescript
@@ -39,6 +41,7 @@ export type S2MetadataStreamOptions<T = any> = {
3941
* accessToken: "s2-token-here",
4042
* source: myAsyncIterable,
4143
* flushIntervalMs: 200, // Optional: flush every 200ms
44+
* debug: true, // Optional: enable debug logging
4245
* });
4346
*
4447
* // Wait for streaming to complete
@@ -57,6 +60,7 @@ export class S2MetadataStream<T = any> implements StreamInstance {
5760
private streamPromise: Promise<void>;
5861
private readonly flushIntervalMs: number;
5962
private readonly maxRetries: number;
63+
private readonly debug: boolean;
6064

6165
// Buffering state
6266
private streamComplete = false;
@@ -74,11 +78,16 @@ export class S2MetadataStream<T = any> implements StreamInstance {
7478

7579
constructor(private options: S2MetadataStreamOptions<T>) {
7680
this.limiter = options.limiter(1);
81+
this.debug = options.debug ?? false;
7782

7883
this.s2Client = new S2({ accessToken: options.accessToken });
7984
this.flushIntervalMs = options.flushIntervalMs ?? 200;
8085
this.maxRetries = options.maxRetries ?? 10;
8186

87+
this.log(
88+
`[S2MetadataStream] Initializing: basin=${options.basin}, stream=${options.stream}, flushIntervalMs=${this.flushIntervalMs}, maxRetries=${this.maxRetries}`
89+
);
90+
8291
const [serverStream, consumerStream] = this.createTeeStreams();
8392
this.serverStream = serverStream;
8493
this.consumerStream = consumerStream;
@@ -105,7 +114,6 @@ export class S2MetadataStream<T = any> implements StreamInstance {
105114

106115
controller.close();
107116
} catch (error) {
108-
console.error("[S2MetadataStream] Error reading from source", error);
109117
controller.error(error);
110118
}
111119
},
@@ -115,6 +123,7 @@ export class S2MetadataStream<T = any> implements StreamInstance {
115123
}
116124

117125
private startBuffering(): void {
126+
this.log("[S2MetadataStream] Starting buffering task");
118127
this.streamReader = this.serverStream.getReader();
119128

120129
this.bufferReaderTask = (async () => {
@@ -126,20 +135,29 @@ export class S2MetadataStream<T = any> implements StreamInstance {
126135

127136
if (done) {
128137
this.streamComplete = true;
138+
this.log(`[S2MetadataStream] Stream complete after ${chunkCount} chunks`);
129139
break;
130140
}
131141

132142
// Add to pending flushes
133143
this.pendingFlushes.push(value);
134144
chunkCount++;
145+
146+
if (chunkCount % 100 === 0) {
147+
this.log(
148+
`[S2MetadataStream] Buffered ${chunkCount} chunks, pending flushes: ${this.pendingFlushes.length}`
149+
);
150+
}
135151
}
136152
} catch (error) {
153+
this.logError("[S2MetadataStream] Error in buffering task:", error);
137154
throw error;
138155
}
139156
})();
140157
}
141158

142159
private startPeriodicFlush(): void {
160+
this.log(`[S2MetadataStream] Starting periodic flush (every ${this.flushIntervalMs}ms)`);
143161
this.flushInterval = setInterval(() => {
144162
this.flush().catch(() => {
145163
// Errors are already logged in flush()
@@ -154,10 +172,10 @@ export class S2MetadataStream<T = any> implements StreamInstance {
154172

155173
// Take all pending chunks
156174
const chunksToFlush = this.pendingFlushes.splice(0);
175+
this.log(`[S2MetadataStream] Flushing ${chunksToFlush.length} chunks to S2`);
157176

158177
// Add flush to limiter queue to ensure sequential execution
159178
const flushPromise = this.limiter(async () => {
160-
const startTime = Date.now();
161179
try {
162180
// Convert chunks to S2 record format (body as JSON string)
163181
const records = chunksToFlush.map((data) => ({
@@ -170,32 +188,31 @@ export class S2MetadataStream<T = any> implements StreamInstance {
170188
appendInput: { records },
171189
});
172190

173-
const duration = Date.now() - startTime;
191+
this.log(`[S2MetadataStream] Successfully flushed ${chunksToFlush.length} chunks`);
174192

175193
// Reset retry count on success
176194
this.retryCount = 0;
177195
} catch (error) {
178-
console.error("[S2MetadataStream] Flush error", {
179-
error,
180-
count: chunksToFlush.length,
181-
retryCount: this.retryCount,
182-
});
183-
184196
// Handle retryable errors
185197
if (this.isRetryableError(error) && this.retryCount < this.maxRetries) {
186198
this.retryCount++;
187199
const delayMs = this.calculateBackoffDelay();
188200

201+
this.logError(
202+
`[S2MetadataStream] Flush failed (attempt ${this.retryCount}/${this.maxRetries}), retrying in ${delayMs}ms:`,
203+
error
204+
);
205+
189206
await this.delay(delayMs);
190207

191208
// Re-add chunks to pending flushes and retry
192209
this.pendingFlushes.unshift(...chunksToFlush);
193210
await this.flush();
194211
} else {
195-
console.error("[S2MetadataStream] Max retries exceeded or non-retryable error", {
196-
retryCount: this.retryCount,
197-
maxRetries: this.maxRetries,
198-
});
212+
this.logError(
213+
`[S2MetadataStream] Flush failed permanently after ${this.retryCount} retries:`,
214+
error
215+
);
199216
throw error;
200217
}
201218
}
@@ -205,20 +222,28 @@ export class S2MetadataStream<T = any> implements StreamInstance {
205222
}
206223

207224
private async initializeServerStream(): Promise<void> {
225+
this.log("[S2MetadataStream] Waiting for buffer task to complete");
208226
// Wait for buffer task and all flushes to complete
209227
await this.bufferReaderTask;
210228

229+
this.log(
230+
`[S2MetadataStream] Buffer task complete, performing final flush (${this.pendingFlushes.length} pending chunks)`
231+
);
211232
// Final flush
212233
await this.flush();
213234

235+
this.log(`[S2MetadataStream] Waiting for ${this.flushPromises.length} flush promises`);
214236
// Wait for all pending flushes
215237
await Promise.all(this.flushPromises);
216238

239+
this.log("[S2MetadataStream] All flushes complete, cleaning up");
217240
// Clean up
218241
if (this.flushInterval) {
219242
clearInterval(this.flushInterval);
220243
this.flushInterval = null;
221244
}
245+
246+
this.log("[S2MetadataStream] Stream completed successfully");
222247
}
223248

224249
public async wait(): Promise<void> {
@@ -231,6 +256,18 @@ export class S2MetadataStream<T = any> implements StreamInstance {
231256

232257
// Helper methods
233258

259+
private log(message: string): void {
260+
if (this.debug) {
261+
console.log(message);
262+
}
263+
}
264+
265+
private logError(message: string, error?: any): void {
266+
if (this.debug) {
267+
console.error(message, error);
268+
}
269+
}
270+
234271
private isRetryableError(error: any): boolean {
235272
if (!error) return false;
236273

packages/core/src/v3/waitUntil/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class NoopManager implements WaitUntilManager {
88
// noop
99
}
1010

11-
blockUntilSettled(timeout: number): Promise<void> {
11+
blockUntilSettled(): Promise<void> {
1212
return Promise.resolve();
1313
}
1414

@@ -44,8 +44,8 @@ export class WaitUntilAPI implements WaitUntilManager {
4444
return this.#getManager().register(promise);
4545
}
4646

47-
blockUntilSettled(timeout: number): Promise<void> {
48-
return this.#getManager().blockUntilSettled(timeout);
47+
blockUntilSettled(): Promise<void> {
48+
return this.#getManager().blockUntilSettled();
4949
}
5050

5151
requiresResolving(): boolean {

packages/core/src/v3/waitUntil/manager.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { MaybeDeferredPromise, WaitUntilManager } from "./types.js";
33
export class StandardWaitUntilManager implements WaitUntilManager {
44
private maybeDeferredPromises: Set<MaybeDeferredPromise> = new Set();
55

6+
constructor(private timeoutInMs: number = 60_000) {}
7+
68
reset(): void {
79
this.maybeDeferredPromises.clear();
810
}
@@ -11,7 +13,7 @@ export class StandardWaitUntilManager implements WaitUntilManager {
1113
this.maybeDeferredPromises.add(promise);
1214
}
1315

14-
async blockUntilSettled(timeout: number): Promise<void> {
16+
async blockUntilSettled(): Promise<void> {
1517
if (this.promisesRequringResolving.length === 0) {
1618
return;
1719
}
@@ -22,7 +24,7 @@ export class StandardWaitUntilManager implements WaitUntilManager {
2224

2325
await Promise.race([
2426
Promise.allSettled(promises),
25-
new Promise<void>((resolve, _) => setTimeout(() => resolve(), timeout)),
27+
new Promise<void>((resolve, _) => setTimeout(() => resolve(), this.timeoutInMs)),
2628
]);
2729

2830
this.maybeDeferredPromises.clear();

packages/core/src/v3/waitUntil/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ export type MaybeDeferredPromise = {
55

66
export interface WaitUntilManager {
77
register(promise: MaybeDeferredPromise): void;
8-
blockUntilSettled(timeout: number): Promise<void>;
8+
blockUntilSettled(): Promise<void>;
99
requiresResolving(): boolean;
1010
}

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,7 @@ export class TaskExecutor {
10791079
return this._tracer.startActiveSpan(
10801080
"waitUntil",
10811081
async (span) => {
1082-
return await waitUntil.blockUntilSettled(60_000);
1082+
return await waitUntil.blockUntilSettled();
10831083
},
10841084
{
10851085
attributes: {

0 commit comments

Comments
 (0)