Skip to content

Commit 7d35a5f

Browse files
committed
Return stream parts from SSE class
1 parent 29feeff commit 7d35a5f

File tree

5 files changed

+74
-27
lines changed
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey
  • packages
  • references/realtime-streams/src/trigger

5 files changed

+74
-27
lines changed

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { LoaderFunctionArgs } from "@remix-run/server-runtime";
2-
import { SSEStreamSubscription } from "@trigger.dev/core/v3";
2+
import { SSEStreamPart, SSEStreamSubscription } from "@trigger.dev/core/v3";
33
import { useEffect, useRef, useState } from "react";
44
import { Paragraph } from "~/components/primitives/Paragraph";
55
import { $replica } from "~/db.server";
@@ -12,6 +12,7 @@ import { cn } from "~/utils/cn";
1212
import { v3RunStreamParamsSchema } from "~/utils/pathBuilder";
1313

1414
type StreamChunk = {
15+
id: string;
1516
data: unknown;
1617
timestamp: number;
1718
};
@@ -261,7 +262,7 @@ function useRealtimeStream(resourcePath: string, startIndex?: number) {
261262

262263
useEffect(() => {
263264
const abortController = new AbortController();
264-
let reader: ReadableStreamDefaultReader<unknown> | null = null;
265+
let reader: ReadableStreamDefaultReader<SSEStreamPart<unknown>> | null = null;
265266

266267
async function connectAndConsume() {
267268
try {
@@ -288,8 +289,9 @@ function useRealtimeStream(resourcePath: string, startIndex?: number) {
288289
setChunks((prev) => [
289290
...prev,
290291
{
291-
data: value,
292-
timestamp: Date.now(),
292+
id: value.id,
293+
data: value.chunk,
294+
timestamp: value.timestamp,
293295
},
294296
]);
295297
}

packages/core/src/v3/apiClient/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ import {
7474
TaskRunShape,
7575
runShapeStream,
7676
RealtimeRunSkipColumns,
77+
type SSEStreamPart,
7778
} from "./runStream.js";
7879
import {
7980
CreateEnvironmentVariableParams,
@@ -142,6 +143,7 @@ export type {
142143
RunStreamCallback,
143144
RunSubscription,
144145
TaskRunShape,
146+
SSEStreamPart,
145147
};
146148

147149
export * from "./getBranch.js";
@@ -1095,7 +1097,13 @@ export class ApiClient {
10951097

10961098
const stream = await subscription.subscribe();
10971099

1098-
return stream as AsyncIterableStream<T>;
1100+
return stream.pipeThrough(
1101+
new TransformStream<SSEStreamPart, T>({
1102+
transform(chunk, controller) {
1103+
controller.enqueue(chunk.chunk as T);
1104+
},
1105+
})
1106+
);
10991107
}
11001108

11011109
async createStream(

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { EventSourceParserStream } from "eventsource-parser/stream";
1+
import { EventSourceMessage, EventSourceParserStream } from "eventsource-parser/stream";
22
import { DeserializedJson } from "../../schemas/json.js";
33
import { createJsonErrorObject } from "../errors.js";
4-
import {
5-
RunStatus,
6-
SubscribeRealtimeStreamChunkRawShape,
7-
SubscribeRunRawShape,
8-
} from "../schemas/api.js";
4+
import { RunStatus, SubscribeRunRawShape } from "../schemas/api.js";
95
import { SerializedError } from "../schemas/common.js";
6+
import {
7+
AsyncIterableStream,
8+
createAsyncIterableReadable,
9+
} from "../streams/asyncIterableStream.js";
1010
import { AnyRunTypes, AnyTask, InferRunTypes } from "../types/tasks.js";
1111
import { getEnvVar } from "../utils/getEnv.js";
1212
import {
@@ -16,11 +16,7 @@ import {
1616
} from "../utils/ioSerialization.js";
1717
import { ApiError } from "./errors.js";
1818
import { ApiClient } from "./index.js";
19-
import { LineTransformStream, zodShapeStream } from "./stream.js";
20-
import {
21-
AsyncIterableStream,
22-
createAsyncIterableReadable,
23-
} from "../streams/asyncIterableStream.js";
19+
import { zodShapeStream } from "./stream.js";
2420

2521
export type RunShape<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTypes
2622
? {
@@ -157,7 +153,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
157153

158154
// First, define interfaces for the stream handling
159155
export interface StreamSubscription {
160-
subscribe(): Promise<ReadableStream<unknown>>;
156+
subscribe(): Promise<ReadableStream<SSEStreamPart<unknown>>>;
161157
}
162158

163159
export type CreateStreamSubscriptionOptions = {
@@ -176,6 +172,12 @@ export interface StreamSubscriptionFactory {
176172
): StreamSubscription;
177173
}
178174

175+
export type SSEStreamPart<TChunk = unknown> = {
176+
id: string;
177+
chunk: TChunk;
178+
timestamp: number;
179+
};
180+
179181
// Real implementation for production
180182
export class SSEStreamSubscription implements StreamSubscription {
181183
private lastEventId: string | undefined;
@@ -197,7 +199,7 @@ export class SSEStreamSubscription implements StreamSubscription {
197199
this.lastEventId = options.lastEventId;
198200
}
199201

200-
async subscribe(): Promise<ReadableStream<unknown>> {
202+
async subscribe(): Promise<ReadableStream<SSEStreamPart>> {
201203
const self = this;
202204

203205
return new ReadableStream({
@@ -210,7 +212,9 @@ export class SSEStreamSubscription implements StreamSubscription {
210212
});
211213
}
212214

213-
private async connectStream(controller: ReadableStreamDefaultController): Promise<void> {
215+
private async connectStream(
216+
controller: ReadableStreamDefaultController<SSEStreamPart>
217+
): Promise<void> {
214218
try {
215219
const headers: Record<string, string> = {
216220
Accept: "text/event-stream",
@@ -259,14 +263,21 @@ export class SSEStreamSubscription implements StreamSubscription {
259263
.pipeThrough(new TextDecoderStream())
260264
.pipeThrough(new EventSourceParserStream())
261265
.pipeThrough(
262-
new TransformStream({
266+
new TransformStream<EventSourceMessage, SSEStreamPart>({
263267
transform: (chunk, chunkController) => {
264268
if (streamVersion === "v1") {
265269
// Track the last event ID for resume support
266270
if (chunk.id) {
267271
this.lastEventId = chunk.id;
268272
}
269-
chunkController.enqueue(safeParseJSON(chunk.data));
273+
274+
const timestamp = parseRedisStreamIdTimestamp(chunk.id);
275+
276+
chunkController.enqueue({
277+
id: chunk.id ?? "unknown",
278+
chunk: safeParseJSON(chunk.data),
279+
timestamp,
280+
});
270281
} else {
271282
if (chunk.event === "batch") {
272283
const data = safeParseJSON(chunk.data) as {
@@ -276,7 +287,11 @@ export class SSEStreamSubscription implements StreamSubscription {
276287
for (const record of data.records) {
277288
this.lastEventId = record.seq_num.toString();
278289

279-
chunkController.enqueue(safeParseJSON(record.body));
290+
chunkController.enqueue({
291+
id: record.seq_num.toString(),
292+
chunk: safeParseJSON(record.body),
293+
timestamp: record.timestamp,
294+
});
280295
}
281296
}
282297
}
@@ -490,7 +505,7 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
490505
transform(chunk, controller) {
491506
controller.enqueue({
492507
type: streamKey,
493-
chunk: chunk as TStreams[typeof streamKey],
508+
chunk: chunk.chunk as TStreams[typeof streamKey],
494509
run,
495510
});
496511
},
@@ -740,3 +755,17 @@ function getStreamsFromRunShape(run: AnyRunShape): string[] {
740755

741756
return run.realtimeStreams;
742757
}
758+
759+
// Redis stream IDs are in the format: <timestamp>-<sequence>
760+
function parseRedisStreamIdTimestamp(id?: string): number {
761+
if (!id) {
762+
return Date.now();
763+
}
764+
765+
const timestamp = parseInt(id.split("-")[0] as string);
766+
if (isNaN(timestamp)) {
767+
return Date.now();
768+
}
769+
770+
return timestamp;
771+
}

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@ export type AppendStreamOptions = {
1919
requestOptions?: ApiRequestOptions;
2020
};
2121

22+
export type AppendStreamResult<T> = {
23+
stream: AsyncIterableStream<T>;
24+
waitUntilComplete: () => Promise<void>;
25+
};
26+
2227
async function append<T>(
2328
key: string,
2429
value: AsyncIterable<T> | ReadableStream<T>,
2530
options?: AppendStreamOptions
26-
): Promise<RealtimeStreamInstance<T>> {
31+
): Promise<AppendStreamResult<T>> {
2732
const runId = getRunIdForOptions(options);
2833

2934
if (!runId) {
@@ -64,7 +69,10 @@ async function append<T>(
6469
span.end();
6570
});
6671

67-
return instance;
72+
return {
73+
stream: instance.stream,
74+
waitUntilComplete: () => instance.wait(),
75+
};
6876
} catch (error) {
6977
// if the error is a signal abort error, we need to end the span but not record an exception
7078
if (error instanceof Error && error.name === "AbortError") {

references/realtime-streams/src/trigger/streams.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ export const streamsTask = task({
113113

114114
const mockStream = createStreamFromGenerator(generator);
115115

116-
const { wait } = await streams.append("stream", mockStream);
116+
const { waitUntilComplete } = await streams.append("stream", mockStream);
117117

118118
await setTimeout(1000);
119119

@@ -128,7 +128,7 @@ export const streamsTask = task({
128128
tokenCount++;
129129
}
130130

131-
await wait();
131+
await waitUntilComplete();
132132

133133
logger.info("Stream completed", { scenario, tokenCount });
134134

0 commit comments

Comments
 (0)