From 40fd8f0111da1ee65a1f50110e309c43a382f85d Mon Sep 17 00:00:00 2001 From: waleed Date: Fri, 30 Jan 2026 11:26:58 -0800 Subject: [PATCH] fix(streaming): handle multi-byte UTF-8 chars split across chunks --- apps/sim/lib/core/utils/response-format.ts | 1 - apps/sim/lib/core/utils/sse.test.ts | 313 +++++++++++++++++++++ apps/sim/lib/core/utils/sse.ts | 15 +- bun.lock | 16 +- package.json | 2 +- turbo.json | 2 +- 6 files changed, 335 insertions(+), 14 deletions(-) create mode 100644 apps/sim/lib/core/utils/sse.test.ts diff --git a/apps/sim/lib/core/utils/response-format.ts b/apps/sim/lib/core/utils/response-format.ts index 0e2ece1f19..7223f17e0f 100644 --- a/apps/sim/lib/core/utils/response-format.ts +++ b/apps/sim/lib/core/utils/response-format.ts @@ -2,7 +2,6 @@ import { createLogger } from '@sim/logger' const logger = createLogger('ResponseFormatUtils') -// Type definitions for component data structures export interface Field { name: string type: string diff --git a/apps/sim/lib/core/utils/sse.test.ts b/apps/sim/lib/core/utils/sse.test.ts new file mode 100644 index 0000000000..524c00b83d --- /dev/null +++ b/apps/sim/lib/core/utils/sse.test.ts @@ -0,0 +1,313 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it, vi } from 'vitest' +import { encodeSSE, readSSEStream, SSE_HEADERS } from '@/lib/core/utils/sse' + +function createStreamFromChunks(chunks: Uint8Array[]): ReadableStream { + let index = 0 + return new ReadableStream({ + pull(controller) { + if (index < chunks.length) { + controller.enqueue(chunks[index]) + index++ + } else { + controller.close() + } + }, + }) +} + +function createSSEChunk(data: object): Uint8Array { + return new TextEncoder().encode(`data: ${JSON.stringify(data)}\n\n`) +} + +describe('SSE_HEADERS', () => { + it.concurrent('should have correct Content-Type', () => { + expect(SSE_HEADERS['Content-Type']).toBe('text/event-stream') + }) + + it.concurrent('should have correct Cache-Control', () => { + expect(SSE_HEADERS['Cache-Control']).toBe('no-cache') + }) + + it.concurrent('should have Connection keep-alive', () => { + expect(SSE_HEADERS.Connection).toBe('keep-alive') + }) + + it.concurrent('should disable buffering', () => { + expect(SSE_HEADERS['X-Accel-Buffering']).toBe('no') + }) +}) + +describe('encodeSSE', () => { + it.concurrent('should encode data as SSE format', () => { + const data = { chunk: 'hello' } + const result = encodeSSE(data) + const decoded = new TextDecoder().decode(result) + expect(decoded).toBe('data: {"chunk":"hello"}\n\n') + }) + + it.concurrent('should handle complex objects', () => { + const data = { chunk: 'test', nested: { value: 123 } } + const result = encodeSSE(data) + const decoded = new TextDecoder().decode(result) + expect(decoded).toBe('data: {"chunk":"test","nested":{"value":123}}\n\n') + }) + + it.concurrent('should handle strings with special characters', () => { + const data = { chunk: 'Hello, 世界! 🌍' } + const result = encodeSSE(data) + const decoded = new TextDecoder().decode(result) + expect(decoded).toContain('Hello, 世界! 🌍') + }) +}) + +describe('readSSEStream', () => { + it.concurrent('should accumulate content from chunks', async () => { + const chunks = [ + createSSEChunk({ chunk: 'Hello' }), + createSSEChunk({ chunk: ' World' }), + createSSEChunk({ done: true }), + ] + const stream = createStreamFromChunks(chunks) + + const result = await readSSEStream(stream) + expect(result).toBe('Hello World') + }) + + it.concurrent('should call onChunk callback for each chunk', async () => { + const onChunk = vi.fn() + const chunks = [createSSEChunk({ chunk: 'A' }), createSSEChunk({ chunk: 'B' })] + const stream = createStreamFromChunks(chunks) + + await readSSEStream(stream, { onChunk }) + + expect(onChunk).toHaveBeenCalledTimes(2) + expect(onChunk).toHaveBeenNthCalledWith(1, 'A') + expect(onChunk).toHaveBeenNthCalledWith(2, 'B') + }) + + it.concurrent('should call onAccumulated callback with accumulated content', async () => { + const onAccumulated = vi.fn() + const chunks = [createSSEChunk({ chunk: 'A' }), createSSEChunk({ chunk: 'B' })] + const stream = createStreamFromChunks(chunks) + + await readSSEStream(stream, { onAccumulated }) + + expect(onAccumulated).toHaveBeenCalledTimes(2) + expect(onAccumulated).toHaveBeenNthCalledWith(1, 'A') + expect(onAccumulated).toHaveBeenNthCalledWith(2, 'AB') + }) + + it.concurrent('should skip [DONE] messages', async () => { + const encoder = new TextEncoder() + const chunks = [createSSEChunk({ chunk: 'content' }), encoder.encode('data: [DONE]\n\n')] + const stream = createStreamFromChunks(chunks) + + const result = await readSSEStream(stream) + expect(result).toBe('content') + }) + + it.concurrent('should skip lines with error field', async () => { + const chunks = [ + createSSEChunk({ error: 'Something went wrong' }), + createSSEChunk({ chunk: 'valid content' }), + ] + const stream = createStreamFromChunks(chunks) + + const result = await readSSEStream(stream) + expect(result).toBe('valid content') + }) + + it.concurrent('should handle abort signal', async () => { + const controller = new AbortController() + controller.abort() + + const chunks = [createSSEChunk({ chunk: 'content' })] + const stream = createStreamFromChunks(chunks) + + const result = await readSSEStream(stream, { signal: controller.signal }) + expect(result).toBe('') + }) + + it.concurrent('should skip unparseable lines', async () => { + const encoder = new TextEncoder() + const chunks = [encoder.encode('data: invalid-json\n\n'), createSSEChunk({ chunk: 'valid' })] + const stream = createStreamFromChunks(chunks) + + const result = await readSSEStream(stream) + expect(result).toBe('valid') + }) + + describe('multi-byte UTF-8 character handling', () => { + it.concurrent('should handle Turkish characters split across chunks', async () => { + const text = 'Merhaba dünya! Öğretmen şarkı söyledi.' + const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n` + const bytes = new TextEncoder().encode(fullData) + + const splitPoint = Math.floor(bytes.length / 2) + const chunk1 = bytes.slice(0, splitPoint) + const chunk2 = bytes.slice(splitPoint) + + const stream = createStreamFromChunks([chunk1, chunk2]) + const result = await readSSEStream(stream) + expect(result).toBe(text) + }) + + it.concurrent('should handle emoji split across chunks', async () => { + const text = 'Hello 🚀 World 🌍 Test 🎯' + const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n` + const bytes = new TextEncoder().encode(fullData) + + const emojiIndex = fullData.indexOf('🚀') + const byteOffset = new TextEncoder().encode(fullData.slice(0, emojiIndex)).length + const splitPoint = byteOffset + 2 + + const chunk1 = bytes.slice(0, splitPoint) + const chunk2 = bytes.slice(splitPoint) + + const stream = createStreamFromChunks([chunk1, chunk2]) + const result = await readSSEStream(stream) + expect(result).toBe(text) + }) + + it.concurrent('should handle CJK characters split across chunks', async () => { + const text = '你好世界!日本語テスト。한국어도 됩니다.' + const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n` + const bytes = new TextEncoder().encode(fullData) + + const third = Math.floor(bytes.length / 3) + const chunk1 = bytes.slice(0, third) + const chunk2 = bytes.slice(third, third * 2) + const chunk3 = bytes.slice(third * 2) + + const stream = createStreamFromChunks([chunk1, chunk2, chunk3]) + const result = await readSSEStream(stream) + expect(result).toBe(text) + }) + + it.concurrent('should handle mixed multi-byte content split at byte boundaries', async () => { + const text = 'Ö is Turkish, 中 is Chinese, 🎉 is emoji' + const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n` + const bytes = new TextEncoder().encode(fullData) + + const chunks: Uint8Array[] = [] + for (let i = 0; i < bytes.length; i += 3) { + chunks.push(bytes.slice(i, Math.min(i + 3, bytes.length))) + } + + const stream = createStreamFromChunks(chunks) + const result = await readSSEStream(stream) + expect(result).toBe(text) + }) + + it.concurrent('should handle SSE message split across chunks', async () => { + const encoder = new TextEncoder() + const message1 = { chunk: 'First' } + const message2 = { chunk: 'Second' } + + const fullText = `data: ${JSON.stringify(message1)}\n\ndata: ${JSON.stringify(message2)}\n\n` + const bytes = encoder.encode(fullText) + + const delimiterIndex = fullText.indexOf('\n\n') + 1 + const byteOffset = encoder.encode(fullText.slice(0, delimiterIndex)).length + + const chunk1 = bytes.slice(0, byteOffset) + const chunk2 = bytes.slice(byteOffset) + + const stream = createStreamFromChunks([chunk1, chunk2]) + const result = await readSSEStream(stream) + expect(result).toBe('FirstSecond') + }) + + it.concurrent('should handle 2-byte UTF-8 character (Ö) split at byte boundary', async () => { + const text = 'AÖB' + const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n` + const bytes = new TextEncoder().encode(fullData) + + const textStart = fullData.indexOf('"') + 1 + text.indexOf('Ö') + const byteOffset = new TextEncoder().encode(fullData.slice(0, textStart)).length + + const chunk1 = bytes.slice(0, byteOffset + 1) + const chunk2 = bytes.slice(byteOffset + 1) + + const stream = createStreamFromChunks([chunk1, chunk2]) + const result = await readSSEStream(stream) + expect(result).toBe(text) + }) + + it.concurrent( + 'should handle 3-byte UTF-8 character (中) split at byte boundaries', + async () => { + const text = 'A中B' + const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n` + const bytes = new TextEncoder().encode(fullData) + + const textStart = fullData.indexOf('"') + 1 + text.indexOf('中') + const byteOffset = new TextEncoder().encode(fullData.slice(0, textStart)).length + + const chunk1 = bytes.slice(0, byteOffset + 1) + const chunk2 = bytes.slice(byteOffset + 1, byteOffset + 2) + const chunk3 = bytes.slice(byteOffset + 2) + + const stream = createStreamFromChunks([chunk1, chunk2, chunk3]) + const result = await readSSEStream(stream) + expect(result).toBe(text) + } + ) + + it.concurrent( + 'should handle 4-byte UTF-8 character (🚀) split at byte boundaries', + async () => { + const text = 'A🚀B' + const fullData = `data: ${JSON.stringify({ chunk: text })}\n\n` + const bytes = new TextEncoder().encode(fullData) + + const textStart = fullData.indexOf('"') + 1 + text.indexOf('🚀') + const byteOffset = new TextEncoder().encode(fullData.slice(0, textStart)).length + + const chunk1 = bytes.slice(0, byteOffset + 1) + const chunk2 = bytes.slice(byteOffset + 1, byteOffset + 2) + const chunk3 = bytes.slice(byteOffset + 2, byteOffset + 3) + const chunk4 = bytes.slice(byteOffset + 3) + + const stream = createStreamFromChunks([chunk1, chunk2, chunk3, chunk4]) + const result = await readSSEStream(stream) + expect(result).toBe(text) + } + ) + }) + + describe('SSE message buffering', () => { + it.concurrent('should handle incomplete SSE message waiting for more data', async () => { + const encoder = new TextEncoder() + + const chunk1 = encoder.encode('data: {"chu') + const chunk2 = encoder.encode('nk":"hello"}\n\n') + + const stream = createStreamFromChunks([chunk1, chunk2]) + const result = await readSSEStream(stream) + expect(result).toBe('hello') + }) + + it.concurrent('should handle multiple complete messages in one chunk', async () => { + const encoder = new TextEncoder() + + const multiMessage = 'data: {"chunk":"A"}\n\ndata: {"chunk":"B"}\n\ndata: {"chunk":"C"}\n\n' + const chunk = encoder.encode(multiMessage) + + const stream = createStreamFromChunks([chunk]) + const result = await readSSEStream(stream) + expect(result).toBe('ABC') + }) + + it.concurrent('should handle message that ends exactly at chunk boundary', async () => { + const chunks = [createSSEChunk({ chunk: 'First' }), createSSEChunk({ chunk: 'Second' })] + const stream = createStreamFromChunks(chunks) + + const result = await readSSEStream(stream) + expect(result).toBe('FirstSecond') + }) + }) +}) diff --git a/apps/sim/lib/core/utils/sse.ts b/apps/sim/lib/core/utils/sse.ts index f6742d1157..9d9d3f785a 100644 --- a/apps/sim/lib/core/utils/sse.ts +++ b/apps/sim/lib/core/utils/sse.ts @@ -45,6 +45,7 @@ export async function readSSEStream( const reader = body.getReader() const decoder = new TextDecoder() let accumulatedContent = '' + let buffer = '' try { while (true) { @@ -53,10 +54,18 @@ export async function readSSEStream( } const { done, value } = await reader.read() - if (done) break - const chunk = decoder.decode(value) - const lines = chunk.split('\n\n') + if (done) { + const remaining = decoder.decode() + if (remaining) { + buffer += remaining + } + break + } + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n\n') + buffer = lines.pop() || '' for (const line of lines) { if (line.startsWith('data: ')) { diff --git a/bun.lock b/bun.lock index 73c20392c1..372d3e8298 100644 --- a/bun.lock +++ b/bun.lock @@ -13,7 +13,7 @@ "glob": "13.0.0", "husky": "9.1.7", "lint-staged": "16.0.0", - "turbo": "2.7.4", + "turbo": "2.8.0", }, }, "apps/docs": { @@ -3425,19 +3425,19 @@ "tunnel-agent": ["tunnel-agent@0.6.0", "", { "dependencies": { "safe-buffer": "^5.0.1" } }, "sha512-McnNiV1l8RYeY8tBgEpuodCC1mLUdbSN+CYBL7kJsJNInOP8UjDDEwdk6Mw60vdLLrr5NHKZhMAOSrR2NZuQ+w=="], - "turbo": ["turbo@2.7.4", "", { "optionalDependencies": { "turbo-darwin-64": "2.7.4", "turbo-darwin-arm64": "2.7.4", "turbo-linux-64": "2.7.4", "turbo-linux-arm64": "2.7.4", "turbo-windows-64": "2.7.4", "turbo-windows-arm64": "2.7.4" }, "bin": { "turbo": "bin/turbo" } }, "sha512-bkO4AddmDishzJB2ze7aYYPaejMoJVfS0XnaR6RCdXFOY8JGJfQE+l9fKiV7uDPa5Ut44gmOWJL3894CIMeH9g=="], + "turbo": ["turbo@2.8.0", "", { "optionalDependencies": { "turbo-darwin-64": "2.8.0", "turbo-darwin-arm64": "2.8.0", "turbo-linux-64": "2.8.0", "turbo-linux-arm64": "2.8.0", "turbo-windows-64": "2.8.0", "turbo-windows-arm64": "2.8.0" }, "bin": { "turbo": "bin/turbo" } }, "sha512-hYbxnLEdvJF+DLALS+Ia+PbfNtn0sDP0hH2u9AFoskSUDmcVHSrtwHpzdX94MrRJKo9D9tYxY3MyP20gnlrWyA=="], - "turbo-darwin-64": ["turbo-darwin-64@2.7.4", "", { "os": "darwin", "cpu": "x64" }, "sha512-xDR30ltfkSsRfGzABBckvl1nz1cZ3ssTujvdj+TPwOweeDRvZ0e06t5DS0rmRBvyKpgGs42K/EK6Mn2qLlFY9A=="], + "turbo-darwin-64": ["turbo-darwin-64@2.8.0", "", { "os": "darwin", "cpu": "x64" }, "sha512-N7f4PYqz25yk8c5kituk09bJ89tE4wPPqKXgYccT6nbEQnGnrdvlyCHLyqViNObTgjjrddqjb1hmDkv7VcxE0g=="], - "turbo-darwin-arm64": ["turbo-darwin-arm64@2.7.4", "", { "os": "darwin", "cpu": "arm64" }, "sha512-P7sjqXtOL/+nYWPvcDGWhi8wf8M8mZHHB8XEzw2VX7VJrS8IGHyJHGD1AYfDvhAEcr7pnk3gGifz3/xyhI655w=="], + "turbo-darwin-arm64": ["turbo-darwin-arm64@2.8.0", "", { "os": "darwin", "cpu": "arm64" }, "sha512-eVzejaP5fn51gmJAPW68U6mSjFaAZ26rPiE36mMdk+tMC4XBGmJHT/fIgrhcrXMvINCl27RF8VmguRe+MBlSuQ=="], - "turbo-linux-64": ["turbo-linux-64@2.7.4", "", { "os": "linux", "cpu": "x64" }, "sha512-GofFOxRO/IhG8BcPyMSSB3Y2+oKQotsaYbHxL9yD6JPb20/o35eo+zUSyazOtilAwDHnak5dorAJFoFU8MIg2A=="], + "turbo-linux-64": ["turbo-linux-64@2.8.0", "", { "os": "linux", "cpu": "x64" }, "sha512-ILR45zviYae3icf4cmUISdj8X17ybNcMh3Ms4cRdJF5sS50qDDTv8qeWqO/lPeHsu6r43gVWDofbDZYVuXYL7Q=="], - "turbo-linux-arm64": ["turbo-linux-arm64@2.7.4", "", { "os": "linux", "cpu": "arm64" }, "sha512-+RQKgNjksVPxYAyAgmDV7w/1qj++qca+nSNTAOKGOfJiDtSvRKoci89oftJ6anGs00uamLKVEQ712TI/tfNAIw=="], + "turbo-linux-arm64": ["turbo-linux-arm64@2.8.0", "", { "os": "linux", "cpu": "arm64" }, "sha512-z9pUa8ENFuHmadPfjEYMRWlXO82t1F/XBDx2XTg+cWWRZHf85FnEB6D4ForJn/GoKEEvwdPhFLzvvhOssom2ug=="], - "turbo-windows-64": ["turbo-windows-64@2.7.4", "", { "os": "win32", "cpu": "x64" }, "sha512-rfak1+g+ON3czs1mDYsCS4X74ZmK6gOgRQTXjDICtzvR4o61paqtgAYtNPofcVsMWeF4wvCajSeoAkkeAnQ1kg=="], + "turbo-windows-64": ["turbo-windows-64@2.8.0", "", { "os": "win32", "cpu": "x64" }, "sha512-J6juRSRjmSErEqJCv7nVIq2DgZ2NHXqyeV8NQTFSyIvrThKiWe7FDOO6oYpuR06+C1NW82aoN4qQt4/gYvz25w=="], - "turbo-windows-arm64": ["turbo-windows-arm64@2.7.4", "", { "os": "win32", "cpu": "arm64" }, "sha512-1ZgBNjNRbDu/fPeqXuX9i26x3CJ/Y1gcwUpQ+Vp7kN9Un6RZ9kzs164f/knrjcu5E+szCRexVjRSJay1k5jApA=="], + "turbo-windows-arm64": ["turbo-windows-arm64@2.8.0", "", { "os": "win32", "cpu": "arm64" }, "sha512-qarBZvCu6uka35739TS+y/3CBU3zScrVAfohAkKHG+So+93Wn+5tKArs8HrO2fuTaGou8fMIeTV7V5NgzCVkSQ=="], "tweetnacl": ["tweetnacl@0.14.5", "", {}, "sha512-KXXFFdAbFXY4geFIwoyNK+f5Z1b7swfXABfL7HXCmoIWMKU3dmS26672A4EeQtDzLKy7SXmfBu51JolvEKwtGA=="], diff --git a/package.json b/package.json index 586e450e07..25613e1a36 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "glob": "13.0.0", "husky": "9.1.7", "lint-staged": "16.0.0", - "turbo": "2.7.4" + "turbo": "2.8.0" }, "lint-staged": { "*.{js,jsx,ts,tsx,json,css,scss}": [ diff --git a/turbo.json b/turbo.json index 29ab9cea45..8a73392ae2 100644 --- a/turbo.json +++ b/turbo.json @@ -1,5 +1,5 @@ { - "$schema": "https://turbo.build/schema.json", + "$schema": "https://v2-8-0.turborepo.dev/schema.json", "envMode": "loose", "tasks": { "build": {