From 93fc57400b0cf9a2bfdc2765d94ee9984f2fd97f Mon Sep 17 00:00:00 2001 From: Hendrik Liebau Date: Thu, 13 Nov 2025 21:23:02 +0100 Subject: [PATCH] [Flight] Fix broken byte stream parsing caused by buffer detachment (#35127) This PR fixes a critical bug where `ReadableStream({type: 'bytes'})` instances passed through React Server Components (RSC) would stall after reading only the first chunk or the first few chunks in the client. This issue was masked by using `web-streams-polyfill` in tests, but manifests with native Web Streams implementations. The root cause is that when a chunk is enqueued to a `ReadableByteStreamController`, the spec requires the underlying ArrayBuffer to be synchronously transferred/detached. In the React Flight Client's chunk parsing, embedded byte stream chunks are created as views into the incoming RSC stream chunk buffer using `new Uint8Array(chunk.buffer, offset, length)`. When embedded byte stream chunks are enqueued, they can detach the shared buffer, leaving the RSC stream parsing in a broken state. The fix is to copy embedded byte stream chunks before enqueueing them, preventing buffer detachment from affecting subsequent parsing. To not affect performance too much, we use a zero-copy optimization: when a chunk ends exactly at the end of the RSC stream chunk, or when the row spans into the next RSC chunk, no further parsing will access that buffer, so we can safely enqueue the view directly without copying. We now also enqueue embedded byte stream chunks immediately as they are parsed, without waiting for the full row to complete. To simplify the logic in the client, we introduce a new `'b'` protocol tag specifically for byte stream chunks. The server now emits `'b'` instead of `'o'` for `Uint8Array` chunks from byte streams (detected via `supportsBYOB`). This allows the client to recognize byte stream chunks without needing to track stream IDs. Tests now use the proper Jest environment with native Web Streams instead of polyfills, exposing and validating the fix for this issue. --- .../react-client/src/ReactFlightClient.js | 59 +++++-- .../ReactFlightTurbopackDOMEdge-test.js | 9 +- .../ReactFlightTurbopackDOMReplyEdge-test.js | 9 +- .../src/__tests__/ReactFlightDOMEdge-test.js | 152 ++++++++++++++---- .../__tests__/ReactFlightDOMReplyEdge-test.js | 55 +++---- .../react-server/src/ReactFlightServer.js | 14 +- 6 files changed, 202 insertions(+), 96 deletions(-) diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index 7b635636157..cc0361e6b8d 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -4857,6 +4857,7 @@ export function processBinaryChunk( resolvedRowTag === 65 /* "A" */ || resolvedRowTag === 79 /* "O" */ || resolvedRowTag === 111 /* "o" */ || + resolvedRowTag === 98 /* "b" */ || resolvedRowTag === 85 /* "U" */ || resolvedRowTag === 83 /* "S" */ || resolvedRowTag === 115 /* "s" */ || @@ -4916,14 +4917,31 @@ export function processBinaryChunk( // We found the last chunk of the row const length = lastIdx - i; const lastChunk = new Uint8Array(chunk.buffer, offset, length); - processFullBinaryRow( - response, - streamState, - rowID, - rowTag, - buffer, - lastChunk, - ); + + // Check if this is a Uint8Array for a byte stream. We enqueue it + // immediately but need to determine if we can use zero-copy or must copy. + if (rowTag === 98 /* "b" */) { + resolveBuffer( + response, + rowID, + // If we're at the end of the RSC chunk, no more parsing will access + // this buffer and we don't need to copy the chunk to allow detaching + // the buffer, otherwise we need to copy. + lastIdx === chunkLength ? lastChunk : lastChunk.slice(), + streamState, + ); + } else { + // Process all other row types. + processFullBinaryRow( + response, + streamState, + rowID, + rowTag, + buffer, + lastChunk, + ); + } + // Reset state machine for a new row i = lastIdx; if (rowState === ROW_CHUNK_BY_NEWLINE) { @@ -4936,14 +4954,27 @@ export function processBinaryChunk( rowLength = 0; buffer.length = 0; } else { - // The rest of this row is in a future chunk. We stash the rest of the - // current chunk until we can process the full row. + // The rest of this row is in a future chunk. const length = chunk.byteLength - i; const remainingSlice = new Uint8Array(chunk.buffer, offset, length); - buffer.push(remainingSlice); - // Update how many bytes we're still waiting for. If we're looking for - // a newline, this doesn't hurt since we'll just ignore it. - rowLength -= remainingSlice.byteLength; + + // For byte streams, we can enqueue the partial row immediately without + // copying since we're at the end of the RSC chunk and no more parsing + // will access this buffer. + if (rowTag === 98 /* "b" */) { + // Update how many bytes we're still waiting for. We need to do this + // before enqueueing, as enqueue will detach the buffer and byteLength + // will become 0. + rowLength -= remainingSlice.byteLength; + resolveBuffer(response, rowID, remainingSlice, streamState); + } else { + // For other row types, stash the rest of the current chunk until we can + // process the full row. + buffer.push(remainingSlice); + // Update how many bytes we're still waiting for. If we're looking for + // a newline, this doesn't hurt since we'll just ignore it. + rowLength -= remainingSlice.byteLength; + } break; } } diff --git a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js index ec2d42201b9..0bd60cb5e4c 100644 --- a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js +++ b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js @@ -5,18 +5,11 @@ * LICENSE file in the root directory of this source tree. * * @emails react-core + * @jest-environment ./scripts/jest/ReactDOMServerIntegrationEnvironment */ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.WritableStream = - require('web-streams-polyfill/ponyfill/es6').WritableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - let clientExports; let turbopackMap; let turbopackModules; diff --git a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js index cbc06ef80d5..09803ab017c 100644 --- a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js +++ b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js @@ -5,17 +5,11 @@ * LICENSE file in the root directory of this source tree. * * @emails react-core + * @jest-environment ./scripts/jest/ReactDOMServerIntegrationEnvironment */ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - -// let serverExports; let turbopackServerMap; let ReactServerDOMServer; let ReactServerDOMClient; @@ -29,7 +23,6 @@ describe('ReactFlightDOMTurbopackReply', () => { require('react-server-dom-turbopack/server.edge'), ); const TurbopackMock = require('./utils/TurbopackMock'); - // serverExports = TurbopackMock.serverExports; turbopackServerMap = TurbopackMock.turbopackServerMap; ReactServerDOMServer = require('react-server-dom-turbopack/server.edge'); jest.resetModules(); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js index f5758ec22cf..5f7dfa516eb 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js @@ -10,18 +10,6 @@ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.WritableStream = - require('web-streams-polyfill/ponyfill/es6').WritableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; -global.Blob = require('buffer').Blob; -if (typeof File === 'undefined' || typeof FormData === 'undefined') { - global.File = require('buffer').File || require('undici').File; - global.FormData = require('undici').FormData; -} // Patch for Edge environments for global scope global.AsyncLocalStorage = require('async_hooks').AsyncLocalStorage; @@ -127,8 +115,16 @@ describe('ReactFlightDOMEdge', () => { chunk.set(prevChunk, 0); chunk.set(value, prevChunk.length); if (chunk.length > 50) { + // Copy the part we're keeping (prevChunk) to avoid buffer + // transfer. When we enqueue the partial chunk below, downstream + // consumers (like byte streams in the Flight Client) may detach + // the underlying buffer. Since prevChunk would share the same + // buffer, we copy it first so it has its own independent buffer. + // TODO: Should we just use {type: 'bytes'} for this stream to + // always transfer ownership, and not only "accidentally" when we + // enqueue in the Flight Client? + prevChunk = chunk.slice(chunk.length - 50); controller.enqueue(chunk.subarray(0, chunk.length - 50)); - prevChunk = chunk.subarray(chunk.length - 50); } else { // Wait to see if we get some more bytes to join in. prevChunk = chunk; @@ -1118,25 +1114,121 @@ describe('ReactFlightDOMEdge', () => { expect(streamedBuffers).toEqual(buffers); }); + it('should support binary ReadableStreams', async () => { + const encoder = new TextEncoder(); + const words = ['Hello', 'streaming', 'world']; + + const stream = new ReadableStream({ + type: 'bytes', + async start(controller) { + for (let i = 0; i < words.length; i++) { + const chunk = encoder.encode(words[i] + ' '); + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const rscStream = await serverAct(() => + ReactServerDOMServer.renderToReadableStream(stream, {}), + ); + + const result = await ReactServerDOMClient.createFromReadableStream( + rscStream, + { + serverConsumerManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + const reader = result.getReader(); + const decoder = new TextDecoder(); + + let text = ''; + let entry; + while (!(entry = await reader.read()).done) { + text += decoder.decode(entry.value); + } + + expect(text).toBe('Hello streaming world '); + }); + + it('should support large binary ReadableStreams', async () => { + const chunkCount = 100; + const chunkSize = 1024; + const expectedBytes = []; + + const stream = new ReadableStream({ + type: 'bytes', + start(controller) { + for (let i = 0; i < chunkCount; i++) { + const chunk = new Uint8Array(chunkSize); + for (let j = 0; j < chunkSize; j++) { + chunk[j] = (i + j) % 256; + } + expectedBytes.push(...Array.from(chunk)); + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const rscStream = await serverAct(() => + ReactServerDOMServer.renderToReadableStream(stream, {}), + ); + + const result = await ReactServerDOMClient.createFromReadableStream( + // Use passThrough to split and rejoin chunks at arbitrary boundaries. + passThrough(rscStream), + { + serverConsumerManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + const reader = result.getReader(); + const receivedBytes = []; + let entry; + while (!(entry = await reader.read()).done) { + expect(entry.value instanceof Uint8Array).toBe(true); + receivedBytes.push(...Array.from(entry.value)); + } + + expect(receivedBytes).toEqual(expectedBytes); + }); + it('should support BYOB binary ReadableStreams', async () => { - const buffer = new Uint8Array([ + const sourceBytes = [ 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, - ]).buffer; + ]; + + // Create separate buffers for each typed array to avoid ArrayBuffer + // transfer issues. Each view needs its own buffer because enqueue() + // transfers ownership. const buffers = [ - new Int8Array(buffer, 1), - new Uint8Array(buffer, 2), - new Uint8ClampedArray(buffer, 2), - new Int16Array(buffer, 2), - new Uint16Array(buffer, 2), - new Int32Array(buffer, 4), - new Uint32Array(buffer, 4), - new Float32Array(buffer, 4), - new Float64Array(buffer, 0), - new BigInt64Array(buffer, 0), - new BigUint64Array(buffer, 0), - new DataView(buffer, 3), + new Int8Array(sourceBytes.slice(1)), + new Uint8Array(sourceBytes.slice(2)), + new Uint8ClampedArray(sourceBytes.slice(2)), + new Int16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Uint16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Int32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Uint32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigInt64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigUint64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new DataView(new Uint8Array(sourceBytes.slice(3)).buffer), ]; + // Save expected bytes before enqueueing (which will detach the buffers). + const expectedBytes = buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ); + // This a binary stream where each chunk ends up as Uint8Array. const s = new ReadableStream({ type: 'bytes', @@ -1176,11 +1268,7 @@ describe('ReactFlightDOMEdge', () => { // The streamed buffers might be in different chunks and in Uint8Array form but // the concatenated bytes should be the same. - expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( - buffers.flatMap(c => - Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), - ), - ); + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(expectedBytes); }); // @gate !__DEV__ || enableComponentPerformanceTrack diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js index 7315e78c619..b874383dcdd 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js @@ -10,18 +10,6 @@ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - -global.Blob = require('buffer').Blob; -if (typeof File === 'undefined' || typeof FormData === 'undefined') { - global.File = require('buffer').File || require('undici').File; - global.FormData = require('undici').FormData; -} - let serverExports; let webpackServerMap; let ReactServerDOMServer; @@ -194,24 +182,33 @@ describe('ReactFlightDOMReplyEdge', () => { }); it('should support BYOB binary ReadableStreams', async () => { - const buffer = new Uint8Array([ + const sourceBytes = [ 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, - ]).buffer; + ]; + + // Create separate buffers for each typed array to avoid ArrayBuffer + // transfer issues. Each view needs its own buffer because enqueue() + // transfers ownership. const buffers = [ - new Int8Array(buffer, 1), - new Uint8Array(buffer, 2), - new Uint8ClampedArray(buffer, 2), - new Int16Array(buffer, 2), - new Uint16Array(buffer, 2), - new Int32Array(buffer, 4), - new Uint32Array(buffer, 4), - new Float32Array(buffer, 4), - new Float64Array(buffer, 0), - new BigInt64Array(buffer, 0), - new BigUint64Array(buffer, 0), - new DataView(buffer, 3), + new Int8Array(sourceBytes.slice(1)), + new Uint8Array(sourceBytes.slice(2)), + new Uint8ClampedArray(sourceBytes.slice(2)), + new Int16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Uint16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Int32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Uint32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigInt64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigUint64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new DataView(new Uint8Array(sourceBytes.slice(3)).buffer), ]; + // Save expected bytes before enqueueing (which will detach the buffers). + const expectedBytes = buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ); + // This a binary stream where each chunk ends up as Uint8Array. const s = new ReadableStream({ type: 'bytes', @@ -239,11 +236,7 @@ describe('ReactFlightDOMReplyEdge', () => { // The streamed buffers might be in different chunks and in Uint8Array form but // the concatenated bytes should be the same. - expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( - buffers.flatMap(c => - Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), - ), - ); + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(expectedBytes); }); it('should abort when parsing an incomplete payload', async () => { diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 1f5e24ff596..f9729b535e0 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -1149,6 +1149,8 @@ function serializeReadableStream( supportsBYOB = false; } } + // At this point supportsBYOB is guaranteed to be a boolean. + const isByteStream: boolean = supportsBYOB; const reader = stream.getReader(); @@ -1172,7 +1174,7 @@ function serializeReadableStream( // The task represents the Stop row. This adds a Start row. request.pendingChunks++; const startStreamRow = - streamTask.id.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n'; + streamTask.id.toString(16) + ':' + (isByteStream ? 'r' : 'R') + '\n'; request.completedRegularChunks.push(stringToChunk(startStreamRow)); function progress(entry: {done: boolean, value: ReactClientValue, ...}) { @@ -1190,9 +1192,15 @@ function serializeReadableStream( callOnAllReadyIfReady(request); } else { try { - streamTask.model = entry.value; request.pendingChunks++; - tryStreamTask(request, streamTask); + streamTask.model = entry.value; + if (isByteStream) { + // Chunks of byte streams are always Uint8Array instances. + const chunk: Uint8Array = (streamTask.model: any); + emitTypedArrayChunk(request, streamTask.id, 'b', chunk, false); + } else { + tryStreamTask(request, streamTask); + } enqueueFlush(request); reader.read().then(progress, error); } catch (x) {