Skip to content

Commit 9bb76d0

Browse files
committed
WIP
1 parent ac27476 commit 9bb76d0

File tree

4 files changed

+97
-68
lines changed

4 files changed

+97
-68
lines changed

apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Lines changed: 52 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ export type S2RealtimeStreamsOptions = {
1010
streamPrefix?: string; // defaults to ""
1111

1212
// Read behavior
13-
s2WaitSeconds?: number; // long poll wait for reads (default 60)
14-
sseHeartbeatMs?: number; // ping interval to keep h2 alive (default 25000)
13+
s2WaitSeconds?: number;
1514

1615
flushIntervalMs?: number; // how often to flush buffered chunks (default 200ms)
1716
maxRetries?: number; // max number of retries for failed flushes (default 10)
@@ -37,7 +36,6 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
3736
private readonly streamPrefix: string;
3837

3938
private readonly s2WaitSeconds: number;
40-
private readonly sseHeartbeatMs: number;
4139

4240
private readonly flushIntervalMs: number;
4341
private readonly maxRetries: number;
@@ -52,7 +50,6 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
5250
this.streamPrefix = opts.streamPrefix ?? "";
5351

5452
this.s2WaitSeconds = opts.s2WaitSeconds ?? 60;
55-
this.sseHeartbeatMs = opts.sseHeartbeatMs ?? 25_000;
5653

5754
this.flushIntervalMs = opts.flushIntervalMs ?? 200;
5855
this.maxRetries = opts.maxRetries ?? 10;
@@ -111,68 +108,18 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
111108
lastEventId?: string
112109
): Promise<Response> {
113110
const s2Stream = this.toStreamName(runId, streamId);
114-
const encoder = new TextEncoder();
115-
116-
const startSeq = this.parseLastEventId(lastEventId); // if undefined => from beginning
117-
const readable = new ReadableStream<Uint8Array>({
118-
start: async (controller) => {
119-
let aborted = false;
120-
const onAbort = () => (aborted = true);
121-
signal.addEventListener("abort", onAbort);
122-
123-
const hb = setInterval(() => {
124-
controller.enqueue(encoder.encode(`: ping\n\n`));
125-
}, this.sseHeartbeatMs);
126-
127-
try {
128-
let nextSeq = startSeq ?? 0;
129-
130-
// Live follow via long-poll read (wait=)
131-
// clamp=true ensures starting past-tail doesn't 416; it clamps to tail and waits.
132-
while (!aborted) {
133-
const resp = await this.s2ReadOnce(s2Stream, {
134-
seq_num: nextSeq,
135-
clamp: true,
136-
count: 1000,
137-
wait: this.s2WaitSeconds, // long polling for new data. :contentReference[oaicite:6]{index=6}
138-
});
139-
140-
if (resp.records?.length) {
141-
for (const rec of resp.records) {
142-
const seq = rec.seq_num!;
143-
controller.enqueue(encoder.encode(`id: ${seq}\n`));
144-
const body = rec.body ?? "";
145-
const lines = body.split("\n").filter((l) => l.length > 0);
146-
for (const line of lines) {
147-
controller.enqueue(encoder.encode(`data: ${line}\n`));
148-
}
149-
controller.enqueue(encoder.encode(`\n`));
150-
nextSeq = seq + 1;
151-
}
152-
}
153-
// If no records within wait, loop; heartbeat keeps connection alive.
154-
}
155-
} catch (error) {
156-
this.logger.error("[S2RealtimeStreams][streamResponse] fatal", {
157-
error,
158-
runId,
159-
streamId,
160-
});
161-
controller.error(error);
162-
} finally {
163-
signal.removeEventListener("abort", onAbort);
164-
clearInterval(hb);
165-
}
166-
},
111+
const startSeq = this.parseLastEventId(lastEventId);
112+
113+
// Request SSE stream from S2 and return it directly
114+
const s2Response = await this.s2StreamRecords(s2Stream, {
115+
seq_num: startSeq ?? 0,
116+
clamp: true,
117+
wait: this.s2WaitSeconds, // S2 will keep the connection open and stream new records
118+
signal, // Pass abort signal so S2 connection is cleaned up when client disconnects
167119
});
168120

169-
return new Response(readable, {
170-
headers: {
171-
"Content-Type": "text/event-stream",
172-
"Cache-Control": "no-cache",
173-
Connection: "keep-alive",
174-
},
175-
});
121+
// Return S2's SSE response directly to the client
122+
return s2Response;
176123
}
177124

178125
// ---------- Internals: S2 REST ----------
@@ -209,6 +156,47 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
209156
return data.access_token;
210157
}
211158

159+
private async s2StreamRecords(
160+
stream: string,
161+
opts: {
162+
seq_num?: number;
163+
clamp?: boolean;
164+
wait?: number;
165+
signal?: AbortSignal;
166+
}
167+
): Promise<Response> {
168+
// GET /v1/streams/{stream}/records with Accept: text/event-stream for SSE streaming
169+
const qs = new URLSearchParams();
170+
if (opts.seq_num != null) qs.set("seq_num", String(opts.seq_num));
171+
if (opts.clamp != null) qs.set("clamp", String(opts.clamp));
172+
if (opts.wait != null) qs.set("wait", String(opts.wait));
173+
174+
const res = await fetch(`${this.baseUrl}/streams/${encodeURIComponent(stream)}/records?${qs}`, {
175+
method: "GET",
176+
headers: {
177+
Authorization: `Bearer ${this.token}`,
178+
Accept: "text/event-stream",
179+
"S2-Format": "raw",
180+
},
181+
signal: opts.signal,
182+
});
183+
184+
if (!res.ok) {
185+
const text = await res.text().catch(() => "");
186+
throw new Error(`S2 stream failed: ${res.status} ${res.statusText} ${text}`);
187+
}
188+
189+
const headers = new Headers(res.headers);
190+
headers.set("X-Stream-Version", "v2");
191+
headers.set("Access-Control-Expose-Headers", "*");
192+
193+
return new Response(res.body, {
194+
headers,
195+
status: res.status,
196+
statusText: res.statusText,
197+
});
198+
}
199+
212200
private async s2ReadOnce(
213201
stream: string,
214202
opts: {

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export function getRealtimeStreamInstance(
4343
logLevel: env.REALTIME_STREAMS_S2_LOG_LEVEL,
4444
flushIntervalMs: env.REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS,
4545
maxRetries: env.REALTIME_STREAMS_S2_MAX_RETRIES,
46+
s2WaitSeconds: env.REALTIME_STREAMS_S2_WAIT_SECONDS,
4647
});
4748
}
4849

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,14 @@ export class SSEStreamSubscription implements StreamSubscription {
216216
throw new Error("No response body");
217217
}
218218

219+
const responseHeaders = Object.fromEntries(response.headers);
220+
221+
console.log("stream response headers", responseHeaders);
222+
223+
const streamVersion = response.headers.get("X-Stream-Version") ?? "v1";
224+
225+
console.log("stream version", streamVersion);
226+
219227
// Reset retry count on successful connection
220228
this.retryCount = 0;
221229

@@ -225,11 +233,25 @@ export class SSEStreamSubscription implements StreamSubscription {
225233
.pipeThrough(
226234
new TransformStream({
227235
transform: (chunk, chunkController) => {
228-
// Track the last event ID for resume support
229-
if (chunk.id) {
230-
this.lastEventId = chunk.id;
236+
if (streamVersion === "v1") {
237+
// Track the last event ID for resume support
238+
if (chunk.id) {
239+
this.lastEventId = chunk.id;
240+
}
241+
chunkController.enqueue(safeParseJSON(chunk.data));
242+
} else {
243+
if (chunk.event === "batch") {
244+
const data = safeParseJSON(chunk.data) as {
245+
records: Array<{ body: string; seq_num: number; timestamp: number }>;
246+
};
247+
248+
for (const record of data.records) {
249+
this.lastEventId = record.seq_num.toString();
250+
251+
chunkController.enqueue(safeParseJSON(record.body));
252+
}
253+
}
231254
}
232-
chunkController.enqueue(safeParseJSON(chunk.data));
233255
},
234256
})
235257
);

references/realtime-streams/src/app/page.tsx

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,24 @@ export default function Home() {
1818
<TriggerButton scenario="slow-steady">Slow Steady Stream (5 min)</TriggerButton>
1919
</div>
2020

21+
<div className="flex flex-col gap-4">
22+
<TriggerButton useDurableStreams={true} scenario="markdown">
23+
Markdown Stream (Durable)
24+
</TriggerButton>
25+
<TriggerButton useDurableStreams={true} scenario="continuous">
26+
Continuous Stream (Durable)
27+
</TriggerButton>
28+
<TriggerButton useDurableStreams={true} scenario="burst">
29+
Burst Stream (Durable)
30+
</TriggerButton>
31+
<TriggerButton useDurableStreams={true} scenario="stall">
32+
Stall Stream (3 min) (Durable)
33+
</TriggerButton>
34+
<TriggerButton useDurableStreams={true} scenario="slow-steady">
35+
Slow Steady Stream (5 min) (Durable)
36+
</TriggerButton>
37+
</div>
38+
2139
<div className="mt-8 pt-8 border-t border-gray-300">
2240
<h2 className="text-xl font-semibold mb-4">Performance Testing</h2>
2341
<TriggerButton scenario="performance" redirect="/performance">

0 commit comments

Comments
 (0)