Skip to content

Commit 8ea2b20

Browse files
committed
feat(ai): update streaming response handling to support UI Message Stream Protocol with JSON payloads
1 parent 52a9869 commit 8ea2b20

File tree

3 files changed

+98
-60
lines changed

3 files changed

+98
-60
lines changed

packages/runtime/src/dispatcher-plugin.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -414,24 +414,22 @@ export function createDispatcherPlugin(config: DispatcherPluginConfig = {}): Plu
414414
if (result.stream && result.events) {
415415
// SSE streaming response
416416
res.status(result.status);
417-
res.header('Content-Type', result.vercelDataStream
418-
? 'text/plain; charset=utf-8'
419-
: 'text/event-stream');
420-
res.header('Cache-Control', 'no-cache');
421-
res.header('Connection', 'keep-alive');
422-
423-
// Write the stream — IHttpServer implementations
424-
// may or may not support raw write. Fall back to
425-
// collecting and sending JSON if write is unavailable.
417+
418+
// Apply headers from the route result if available
419+
if (result.headers) {
420+
for (const [k, v] of Object.entries(result.headers)) {
421+
res.header(k, v);
422+
}
423+
} else {
424+
res.header('Content-Type', 'text/event-stream');
425+
res.header('Cache-Control', 'no-cache');
426+
res.header('Connection', 'keep-alive');
427+
}
428+
429+
// Write the stream — events are pre-encoded SSE strings
426430
if (typeof res.write === 'function' && typeof res.end === 'function') {
427431
for await (const event of result.events) {
428-
if (result.vercelDataStream) {
429-
// Events are already TextStreamPart — need encoding
430-
// Import is dynamic to avoid hard dep on service-ai
431-
res.write(typeof event === 'string' ? event : JSON.stringify(event) + '\n');
432-
} else {
433-
res.write(`data: ${JSON.stringify(event)}\n\n`);
434-
}
432+
res.write(typeof event === 'string' ? event : `data: ${JSON.stringify(event)}\n\n`);
435433
}
436434
res.end();
437435
} else {

packages/services/service-ai/src/routes/ai-routes.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,25 @@ export function buildAIRoutes(
250250
const wantStream = body.stream !== false;
251251

252252
if (wantStream) {
253-
// Vercel Data Stream Protocol (SSE)
253+
// UI Message Stream Protocol (SSE with JSON payloads)
254254
try {
255255
if (!aiService.streamChat) {
256256
return { status: 501, body: { error: 'Streaming is not supported by the configured AI service' } };
257257
}
258258
const events = aiService.streamChat(finalMessages, resolvedOptions as any);
259-
return { status: 200, stream: true, vercelDataStream: true, events: encodeVercelDataStream(events) };
259+
return {
260+
status: 200,
261+
stream: true,
262+
vercelDataStream: true,
263+
contentType: 'text/event-stream',
264+
headers: {
265+
'Content-Type': 'text/event-stream',
266+
'Cache-Control': 'no-cache',
267+
'Connection': 'keep-alive',
268+
'x-vercel-ai-ui-message-stream': 'v1',
269+
},
270+
events: encodeVercelDataStream(events),
271+
};
260272
} catch (err) {
261273
logger.error('[AI Route] /chat stream error', err instanceof Error ? err : undefined);
262274
return { status: 500, body: { error: 'Internal AI service error' } };
Lines changed: 70 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,88 @@
11
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.
22

33
/**
4-
* Vercel Data Stream Encoder
4+
* Vercel AI SDK v6 — UI Message Stream Encoder
55
*
66
* Converts `AsyncIterable<TextStreamPart<ToolSet>>` (the internal ObjectStack
7-
* streaming format, aligned with Vercel AI SDK types) into the Vercel AI SDK
8-
* **Data Stream Protocol** wire format.
7+
* streaming format) into the Vercel AI SDK v6 **UI Message Stream Protocol**.
98
*
10-
* Each frame is a single line: `<type-code>:<JSON>\n`
9+
* Wire format: Server-Sent Events (SSE) with JSON payloads.
10+
* `data: {"type":"text-delta","id":"0","delta":"Hello"}\n\n`
1111
*
12-
* | Code | Description | Payload shape |
13-
* |:-----|:-------------------------|:-------------------------------------------------------------|
14-
* | `0` | Text delta | `"<text>"` |
15-
* | `9` | Tool call (full) | `{"toolCallId","toolName","args"}` |
16-
* | `b` | Tool call start | `{"toolCallId","toolName"}` |
17-
* | `c` | Tool call delta | `{"toolCallId","argsTextDelta"}` |
18-
* | `a` | Tool result | `{"toolCallId","result"}` |
19-
* | `d` | Finish (message-level) | `{"finishReason","usage"?}` |
20-
* | `e` | Step finish | `{"finishReason","usage"?,"isContinued"?}` |
12+
* The client-side `DefaultChatTransport` from `ai` v6 uses
13+
* `parseJsonEventStream` to parse these SSE events.
2114
*
2215
* @see https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol
2316
*/
2417

2518
import type { TextStreamPart, ToolSet } from 'ai';
2619

20+
// ── SSE helpers ──────────────────────────────────────────────────────
21+
22+
function sse(data: object): string {
23+
return `data: ${JSON.stringify(data)}\n\n`;
24+
}
25+
2726
// ── Public API ──────────────────────────────────────────────────────
2827

2928
/**
30-
* Encode a single `TextStreamPart` event into its Vercel Data Stream frame(s).
29+
* Encode a single `TextStreamPart` event into SSE-formatted UI Message
30+
* Stream chunk(s).
3131
*
32-
* Returns an empty string for event types that have no wire-format mapping
33-
* (e.g. internal-only events).
32+
* Returns an empty string for event types that have no wire-format mapping.
3433
*/
3534
export function encodeStreamPart(part: TextStreamPart<ToolSet>): string {
3635
switch (part.type) {
3736
// ── Text ──────────────────────────────────────────────────
3837
case 'text-delta':
39-
return `0:${JSON.stringify(part.text)}\n`;
38+
return sse({ type: 'text-delta', id: '0', delta: part.text });
4039

4140
// ── Tool calling ─────────────────────────────────────────
42-
case 'tool-call':
43-
return `9:${JSON.stringify({
44-
toolCallId: part.toolCallId,
45-
toolName: part.toolName,
46-
args: part.input,
47-
})}\n`;
48-
4941
case 'tool-input-start':
50-
return `b:${JSON.stringify({
42+
return sse({
43+
type: 'tool-input-start',
5144
toolCallId: part.id,
5245
toolName: part.toolName,
53-
})}\n`;
46+
});
5447

5548
case 'tool-input-delta':
56-
return `c:${JSON.stringify({
49+
return sse({
50+
type: 'tool-input-delta',
5751
toolCallId: part.id,
58-
argsTextDelta: part.delta,
59-
})}\n`;
52+
inputTextDelta: part.delta,
53+
});
54+
55+
case 'tool-call':
56+
return sse({
57+
type: 'tool-input-available',
58+
toolCallId: part.toolCallId,
59+
toolName: part.toolName,
60+
input: part.input,
61+
});
6062

6163
case 'tool-result':
62-
return `a:${JSON.stringify({
64+
return sse({
65+
type: 'tool-output-available',
6366
toolCallId: part.toolCallId,
64-
result: part.output,
65-
})}\n`;
67+
output: part.output,
68+
});
6669

6770
// ── Finish / Step ────────────────────────────────────────
71+
case 'finish-step':
72+
return sse({ type: 'finish-step' });
73+
6874
case 'finish':
69-
return `d:${JSON.stringify({
75+
return sse({
76+
type: 'finish',
7077
finishReason: part.finishReason,
71-
usage: part.totalUsage ?? undefined,
72-
})}\n`;
78+
});
7379

74-
case 'finish-step':
75-
return `e:${JSON.stringify({
76-
finishReason: part.finishReason,
77-
usage: part.usage ?? undefined,
78-
})}\n`;
80+
// ── Error ────────────────────────────────────────────────
81+
case 'error':
82+
return sse({
83+
type: 'error',
84+
errorText: String(part.error),
85+
});
7986

8087
// ── Unhandled types (silently skip) ──────────────────────
8188
default:
@@ -85,17 +92,38 @@ export function encodeStreamPart(part: TextStreamPart<ToolSet>): string {
8592

8693
/**
8794
* Transform an `AsyncIterable<TextStreamPart>` into an `AsyncIterable<string>`
88-
* where each yielded string is a Vercel Data Stream frame.
95+
* where each yielded string is an SSE-formatted UI Message Stream chunk.
8996
*
90-
* Empty frames (from unmapped event types) are silently dropped.
97+
* Emits the required `start`, `start-step`, `text-start` preamble and
98+
* `text-end`, `finish-step`, `finish`, `[DONE]` postamble automatically.
9199
*/
92100
export async function* encodeVercelDataStream(
93101
events: AsyncIterable<TextStreamPart<ToolSet>>,
94102
): AsyncIterable<string> {
103+
// Preamble
104+
yield sse({ type: 'start' });
105+
yield sse({ type: 'start-step' });
106+
yield sse({ type: 'text-start', id: '0' });
107+
108+
let finishReason = 'stop';
109+
95110
for await (const part of events) {
111+
if (part.type === 'finish') {
112+
finishReason = part.finishReason ?? 'stop';
113+
}
96114
const frame = encodeStreamPart(part);
97115
if (frame) {
98116
yield frame;
99117
}
100118
}
119+
120+
// Postamble — text-end + finish-step + finish are already emitted by
121+
// encodeStreamPart when the corresponding parts arrive from the LLM.
122+
// However, we always need text-end and the [DONE] sentinel.
123+
yield sse({ type: 'text-end', id: '0' });
124+
// finish-step and finish may have already been emitted; emit them
125+
// again only as safeguards — the client handles duplicates gracefully.
126+
yield sse({ type: 'finish-step' });
127+
yield sse({ type: 'finish', finishReason });
128+
yield 'data: [DONE]\n\n';
101129
}

0 commit comments

Comments
 (0)