Skip to content

Commit f7c3de0

Browse files
authored
fix(streaming): handle multi-byte UTF-8 chars split across chunks (#3083)
1 parent 2ec9b7f commit f7c3de0

File tree

6 files changed

+335
-14
lines changed

6 files changed

+335
-14
lines changed

apps/sim/lib/core/utils/response-format.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { createLogger } from '@sim/logger'
22

33
const logger = createLogger('ResponseFormatUtils')
44

5-
// Type definitions for component data structures
65
export interface Field {
76
name: string
87
type: string
Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { describe, expect, it, vi } from 'vitest'
5+
import { encodeSSE, readSSEStream, SSE_HEADERS } from '@/lib/core/utils/sse'
6+
7+
function createStreamFromChunks(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
8+
let index = 0
9+
return new ReadableStream({
10+
pull(controller) {
11+
if (index < chunks.length) {
12+
controller.enqueue(chunks[index])
13+
index++
14+
} else {
15+
controller.close()
16+
}
17+
},
18+
})
19+
}
20+
21+
function createSSEChunk(data: object): Uint8Array {
22+
return new TextEncoder().encode(`data: ${JSON.stringify(data)}\n\n`)
23+
}
24+
25+
describe('SSE_HEADERS', () => {
26+
it.concurrent('should have correct Content-Type', () => {
27+
expect(SSE_HEADERS['Content-Type']).toBe('text/event-stream')
28+
})
29+
30+
it.concurrent('should have correct Cache-Control', () => {
31+
expect(SSE_HEADERS['Cache-Control']).toBe('no-cache')
32+
})
33+
34+
it.concurrent('should have Connection keep-alive', () => {
35+
expect(SSE_HEADERS.Connection).toBe('keep-alive')
36+
})
37+
38+
it.concurrent('should disable buffering', () => {
39+
expect(SSE_HEADERS['X-Accel-Buffering']).toBe('no')
40+
})
41+
})
42+
43+
describe('encodeSSE', () => {
44+
it.concurrent('should encode data as SSE format', () => {
45+
const data = { chunk: 'hello' }
46+
const result = encodeSSE(data)
47+
const decoded = new TextDecoder().decode(result)
48+
expect(decoded).toBe('data: {"chunk":"hello"}\n\n')
49+
})
50+
51+
it.concurrent('should handle complex objects', () => {
52+
const data = { chunk: 'test', nested: { value: 123 } }
53+
const result = encodeSSE(data)
54+
const decoded = new TextDecoder().decode(result)
55+
expect(decoded).toBe('data: {"chunk":"test","nested":{"value":123}}\n\n')
56+
})
57+
58+
it.concurrent('should handle strings with special characters', () => {
59+
const data = { chunk: 'Hello, 世界! 🌍' }
60+
const result = encodeSSE(data)
61+
const decoded = new TextDecoder().decode(result)
62+
expect(decoded).toContain('Hello, 世界! 🌍')
63+
})
64+
})
65+
66+
describe('readSSEStream', () => {
67+
it.concurrent('should accumulate content from chunks', async () => {
68+
const chunks = [
69+
createSSEChunk({ chunk: 'Hello' }),
70+
createSSEChunk({ chunk: ' World' }),
71+
createSSEChunk({ done: true }),
72+
]
73+
const stream = createStreamFromChunks(chunks)
74+
75+
const result = await readSSEStream(stream)
76+
expect(result).toBe('Hello World')
77+
})
78+
79+
it.concurrent('should call onChunk callback for each chunk', async () => {
80+
const onChunk = vi.fn()
81+
const chunks = [createSSEChunk({ chunk: 'A' }), createSSEChunk({ chunk: 'B' })]
82+
const stream = createStreamFromChunks(chunks)
83+
84+
await readSSEStream(stream, { onChunk })
85+
86+
expect(onChunk).toHaveBeenCalledTimes(2)
87+
expect(onChunk).toHaveBeenNthCalledWith(1, 'A')
88+
expect(onChunk).toHaveBeenNthCalledWith(2, 'B')
89+
})
90+
91+
it.concurrent('should call onAccumulated callback with accumulated content', async () => {
92+
const onAccumulated = vi.fn()
93+
const chunks = [createSSEChunk({ chunk: 'A' }), createSSEChunk({ chunk: 'B' })]
94+
const stream = createStreamFromChunks(chunks)
95+
96+
await readSSEStream(stream, { onAccumulated })
97+
98+
expect(onAccumulated).toHaveBeenCalledTimes(2)
99+
expect(onAccumulated).toHaveBeenNthCalledWith(1, 'A')
100+
expect(onAccumulated).toHaveBeenNthCalledWith(2, 'AB')
101+
})
102+
103+
it.concurrent('should skip [DONE] messages', async () => {
104+
const encoder = new TextEncoder()
105+
const chunks = [createSSEChunk({ chunk: 'content' }), encoder.encode('data: [DONE]\n\n')]
106+
const stream = createStreamFromChunks(chunks)
107+
108+
const result = await readSSEStream(stream)
109+
expect(result).toBe('content')
110+
})
111+
112+
it.concurrent('should skip lines with error field', async () => {
113+
const chunks = [
114+
createSSEChunk({ error: 'Something went wrong' }),
115+
createSSEChunk({ chunk: 'valid content' }),
116+
]
117+
const stream = createStreamFromChunks(chunks)
118+
119+
const result = await readSSEStream(stream)
120+
expect(result).toBe('valid content')
121+
})
122+
123+
it.concurrent('should handle abort signal', async () => {
124+
const controller = new AbortController()
125+
controller.abort()
126+
127+
const chunks = [createSSEChunk({ chunk: 'content' })]
128+
const stream = createStreamFromChunks(chunks)
129+
130+
const result = await readSSEStream(stream, { signal: controller.signal })
131+
expect(result).toBe('')
132+
})
133+
134+
it.concurrent('should skip unparseable lines', async () => {
135+
const encoder = new TextEncoder()
136+
const chunks = [encoder.encode('data: invalid-json\n\n'), createSSEChunk({ chunk: 'valid' })]
137+
const stream = createStreamFromChunks(chunks)
138+
139+
const result = await readSSEStream(stream)
140+
expect(result).toBe('valid')
141+
})
142+
143+
describe('multi-byte UTF-8 character handling', () => {
144+
it.concurrent('should handle Turkish characters split across chunks', async () => {
145+
const text = 'Merhaba dünya! Öğretmen şarkı söyledi.'
146+
const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n`
147+
const bytes = new TextEncoder().encode(fullData)
148+
149+
const splitPoint = Math.floor(bytes.length / 2)
150+
const chunk1 = bytes.slice(0, splitPoint)
151+
const chunk2 = bytes.slice(splitPoint)
152+
153+
const stream = createStreamFromChunks([chunk1, chunk2])
154+
const result = await readSSEStream(stream)
155+
expect(result).toBe(text)
156+
})
157+
158+
it.concurrent('should handle emoji split across chunks', async () => {
159+
const text = 'Hello 🚀 World 🌍 Test 🎯'
160+
const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n`
161+
const bytes = new TextEncoder().encode(fullData)
162+
163+
const emojiIndex = fullData.indexOf('🚀')
164+
const byteOffset = new TextEncoder().encode(fullData.slice(0, emojiIndex)).length
165+
const splitPoint = byteOffset + 2
166+
167+
const chunk1 = bytes.slice(0, splitPoint)
168+
const chunk2 = bytes.slice(splitPoint)
169+
170+
const stream = createStreamFromChunks([chunk1, chunk2])
171+
const result = await readSSEStream(stream)
172+
expect(result).toBe(text)
173+
})
174+
175+
it.concurrent('should handle CJK characters split across chunks', async () => {
176+
const text = '你好世界!日本語テスト。한국어도 됩니다.'
177+
const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n`
178+
const bytes = new TextEncoder().encode(fullData)
179+
180+
const third = Math.floor(bytes.length / 3)
181+
const chunk1 = bytes.slice(0, third)
182+
const chunk2 = bytes.slice(third, third * 2)
183+
const chunk3 = bytes.slice(third * 2)
184+
185+
const stream = createStreamFromChunks([chunk1, chunk2, chunk3])
186+
const result = await readSSEStream(stream)
187+
expect(result).toBe(text)
188+
})
189+
190+
it.concurrent('should handle mixed multi-byte content split at byte boundaries', async () => {
191+
const text = 'Ö is Turkish, 中 is Chinese, 🎉 is emoji'
192+
const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n`
193+
const bytes = new TextEncoder().encode(fullData)
194+
195+
const chunks: Uint8Array[] = []
196+
for (let i = 0; i < bytes.length; i += 3) {
197+
chunks.push(bytes.slice(i, Math.min(i + 3, bytes.length)))
198+
}
199+
200+
const stream = createStreamFromChunks(chunks)
201+
const result = await readSSEStream(stream)
202+
expect(result).toBe(text)
203+
})
204+
205+
it.concurrent('should handle SSE message split across chunks', async () => {
206+
const encoder = new TextEncoder()
207+
const message1 = { chunk: 'First' }
208+
const message2 = { chunk: 'Second' }
209+
210+
const fullText = `data: ${JSON.stringify(message1)}\n\ndata: ${JSON.stringify(message2)}\n\n`
211+
const bytes = encoder.encode(fullText)
212+
213+
const delimiterIndex = fullText.indexOf('\n\n') + 1
214+
const byteOffset = encoder.encode(fullText.slice(0, delimiterIndex)).length
215+
216+
const chunk1 = bytes.slice(0, byteOffset)
217+
const chunk2 = bytes.slice(byteOffset)
218+
219+
const stream = createStreamFromChunks([chunk1, chunk2])
220+
const result = await readSSEStream(stream)
221+
expect(result).toBe('FirstSecond')
222+
})
223+
224+
it.concurrent('should handle 2-byte UTF-8 character (Ö) split at byte boundary', async () => {
225+
const text = 'AÖB'
226+
const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n`
227+
const bytes = new TextEncoder().encode(fullData)
228+
229+
const textStart = fullData.indexOf('"') + 1 + text.indexOf('Ö')
230+
const byteOffset = new TextEncoder().encode(fullData.slice(0, textStart)).length
231+
232+
const chunk1 = bytes.slice(0, byteOffset + 1)
233+
const chunk2 = bytes.slice(byteOffset + 1)
234+
235+
const stream = createStreamFromChunks([chunk1, chunk2])
236+
const result = await readSSEStream(stream)
237+
expect(result).toBe(text)
238+
})
239+
240+
it.concurrent(
241+
'should handle 3-byte UTF-8 character (中) split at byte boundaries',
242+
async () => {
243+
const text = 'A中B'
244+
const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n`
245+
const bytes = new TextEncoder().encode(fullData)
246+
247+
const textStart = fullData.indexOf('"') + 1 + text.indexOf('中')
248+
const byteOffset = new TextEncoder().encode(fullData.slice(0, textStart)).length
249+
250+
const chunk1 = bytes.slice(0, byteOffset + 1)
251+
const chunk2 = bytes.slice(byteOffset + 1, byteOffset + 2)
252+
const chunk3 = bytes.slice(byteOffset + 2)
253+
254+
const stream = createStreamFromChunks([chunk1, chunk2, chunk3])
255+
const result = await readSSEStream(stream)
256+
expect(result).toBe(text)
257+
}
258+
)
259+
260+
it.concurrent(
261+
'should handle 4-byte UTF-8 character (🚀) split at byte boundaries',
262+
async () => {
263+
const text = 'A🚀B'
264+
const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n`
265+
const bytes = new TextEncoder().encode(fullData)
266+
267+
const textStart = fullData.indexOf('"') + 1 + text.indexOf('🚀')
268+
const byteOffset = new TextEncoder().encode(fullData.slice(0, textStart)).length
269+
270+
const chunk1 = bytes.slice(0, byteOffset + 1)
271+
const chunk2 = bytes.slice(byteOffset + 1, byteOffset + 2)
272+
const chunk3 = bytes.slice(byteOffset + 2, byteOffset + 3)
273+
const chunk4 = bytes.slice(byteOffset + 3)
274+
275+
const stream = createStreamFromChunks([chunk1, chunk2, chunk3, chunk4])
276+
const result = await readSSEStream(stream)
277+
expect(result).toBe(text)
278+
}
279+
)
280+
})
281+
282+
describe('SSE message buffering', () => {
283+
it.concurrent('should handle incomplete SSE message waiting for more data', async () => {
284+
const encoder = new TextEncoder()
285+
286+
const chunk1 = encoder.encode('data: {"chu')
287+
const chunk2 = encoder.encode('nk":"hello"}\n\n')
288+
289+
const stream = createStreamFromChunks([chunk1, chunk2])
290+
const result = await readSSEStream(stream)
291+
expect(result).toBe('hello')
292+
})
293+
294+
it.concurrent('should handle multiple complete messages in one chunk', async () => {
295+
const encoder = new TextEncoder()
296+
297+
const multiMessage = 'data: {"chunk":"A"}\n\ndata: {"chunk":"B"}\n\ndata: {"chunk":"C"}\n\n'
298+
const chunk = encoder.encode(multiMessage)
299+
300+
const stream = createStreamFromChunks([chunk])
301+
const result = await readSSEStream(stream)
302+
expect(result).toBe('ABC')
303+
})
304+
305+
it.concurrent('should handle message that ends exactly at chunk boundary', async () => {
306+
const chunks = [createSSEChunk({ chunk: 'First' }), createSSEChunk({ chunk: 'Second' })]
307+
const stream = createStreamFromChunks(chunks)
308+
309+
const result = await readSSEStream(stream)
310+
expect(result).toBe('FirstSecond')
311+
})
312+
})
313+
})

apps/sim/lib/core/utils/sse.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export async function readSSEStream(
4545
const reader = body.getReader()
4646
const decoder = new TextDecoder()
4747
let accumulatedContent = ''
48+
let buffer = ''
4849

4950
try {
5051
while (true) {
@@ -53,10 +54,18 @@ export async function readSSEStream(
5354
}
5455

5556
const { done, value } = await reader.read()
56-
if (done) break
5757

58-
const chunk = decoder.decode(value)
59-
const lines = chunk.split('\n\n')
58+
if (done) {
59+
const remaining = decoder.decode()
60+
if (remaining) {
61+
buffer += remaining
62+
}
63+
break
64+
}
65+
66+
buffer += decoder.decode(value, { stream: true })
67+
const lines = buffer.split('\n\n')
68+
buffer = lines.pop() || ''
6069

6170
for (const line of lines) {
6271
if (line.startsWith('data: ')) {

bun.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
"glob": "13.0.0",
4343
"husky": "9.1.7",
4444
"lint-staged": "16.0.0",
45-
"turbo": "2.7.4"
45+
"turbo": "2.8.0"
4646
},
4747
"lint-staged": {
4848
"*.{js,jsx,ts,tsx,json,css,scss}": [

0 commit comments

Comments
 (0)