diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index 7b635636157..cc0361e6b8d 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -4857,6 +4857,7 @@ export function processBinaryChunk( resolvedRowTag === 65 /* "A" */ || resolvedRowTag === 79 /* "O" */ || resolvedRowTag === 111 /* "o" */ || + resolvedRowTag === 98 /* "b" */ || resolvedRowTag === 85 /* "U" */ || resolvedRowTag === 83 /* "S" */ || resolvedRowTag === 115 /* "s" */ || @@ -4916,14 +4917,31 @@ export function processBinaryChunk( // We found the last chunk of the row const length = lastIdx - i; const lastChunk = new Uint8Array(chunk.buffer, offset, length); - processFullBinaryRow( - response, - streamState, - rowID, - rowTag, - buffer, - lastChunk, - ); + + // Check if this is a Uint8Array for a byte stream. We enqueue it + // immediately but need to determine if we can use zero-copy or must copy. + if (rowTag === 98 /* "b" */) { + resolveBuffer( + response, + rowID, + // If we're at the end of the RSC chunk, no more parsing will access + // this buffer and we don't need to copy the chunk to allow detaching + // the buffer, otherwise we need to copy. + lastIdx === chunkLength ? lastChunk : lastChunk.slice(), + streamState, + ); + } else { + // Process all other row types. + processFullBinaryRow( + response, + streamState, + rowID, + rowTag, + buffer, + lastChunk, + ); + } + // Reset state machine for a new row i = lastIdx; if (rowState === ROW_CHUNK_BY_NEWLINE) { @@ -4936,14 +4954,27 @@ export function processBinaryChunk( rowLength = 0; buffer.length = 0; } else { - // The rest of this row is in a future chunk. We stash the rest of the - // current chunk until we can process the full row. + // The rest of this row is in a future chunk. const length = chunk.byteLength - i; const remainingSlice = new Uint8Array(chunk.buffer, offset, length); - buffer.push(remainingSlice); - // Update how many bytes we're still waiting for. If we're looking for - // a newline, this doesn't hurt since we'll just ignore it. - rowLength -= remainingSlice.byteLength; + + // For byte streams, we can enqueue the partial row immediately without + // copying since we're at the end of the RSC chunk and no more parsing + // will access this buffer. + if (rowTag === 98 /* "b" */) { + // Update how many bytes we're still waiting for. We need to do this + // before enqueueing, as enqueue will detach the buffer and byteLength + // will become 0. + rowLength -= remainingSlice.byteLength; + resolveBuffer(response, rowID, remainingSlice, streamState); + } else { + // For other row types, stash the rest of the current chunk until we can + // process the full row. + buffer.push(remainingSlice); + // Update how many bytes we're still waiting for. If we're looking for + // a newline, this doesn't hurt since we'll just ignore it. + rowLength -= remainingSlice.byteLength; + } break; } } diff --git a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js index ec2d42201b9..0bd60cb5e4c 100644 --- a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js +++ b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMEdge-test.js @@ -5,18 +5,11 @@ * LICENSE file in the root directory of this source tree. * * @emails react-core + * @jest-environment ./scripts/jest/ReactDOMServerIntegrationEnvironment */ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.WritableStream = - require('web-streams-polyfill/ponyfill/es6').WritableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - let clientExports; let turbopackMap; let turbopackModules; diff --git a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js index cbc06ef80d5..09803ab017c 100644 --- a/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js +++ b/packages/react-server-dom-turbopack/src/__tests__/ReactFlightTurbopackDOMReplyEdge-test.js @@ -5,17 +5,11 @@ * LICENSE file in the root directory of this source tree. * * @emails react-core + * @jest-environment ./scripts/jest/ReactDOMServerIntegrationEnvironment */ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - -// let serverExports; let turbopackServerMap; let ReactServerDOMServer; let ReactServerDOMClient; @@ -29,7 +23,6 @@ describe('ReactFlightDOMTurbopackReply', () => { require('react-server-dom-turbopack/server.edge'), ); const TurbopackMock = require('./utils/TurbopackMock'); - // serverExports = TurbopackMock.serverExports; turbopackServerMap = TurbopackMock.turbopackServerMap; ReactServerDOMServer = require('react-server-dom-turbopack/server.edge'); jest.resetModules(); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js index f5758ec22cf..5f7dfa516eb 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js @@ -10,18 +10,6 @@ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.WritableStream = - require('web-streams-polyfill/ponyfill/es6').WritableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; -global.Blob = require('buffer').Blob; -if (typeof File === 'undefined' || typeof FormData === 'undefined') { - global.File = require('buffer').File || require('undici').File; - global.FormData = require('undici').FormData; -} // Patch for Edge environments for global scope global.AsyncLocalStorage = require('async_hooks').AsyncLocalStorage; @@ -127,8 +115,16 @@ describe('ReactFlightDOMEdge', () => { chunk.set(prevChunk, 0); chunk.set(value, prevChunk.length); if (chunk.length > 50) { + // Copy the part we're keeping (prevChunk) to avoid buffer + // transfer. When we enqueue the partial chunk below, downstream + // consumers (like byte streams in the Flight Client) may detach + // the underlying buffer. Since prevChunk would share the same + // buffer, we copy it first so it has its own independent buffer. + // TODO: Should we just use {type: 'bytes'} for this stream to + // always transfer ownership, and not only "accidentally" when we + // enqueue in the Flight Client? + prevChunk = chunk.slice(chunk.length - 50); controller.enqueue(chunk.subarray(0, chunk.length - 50)); - prevChunk = chunk.subarray(chunk.length - 50); } else { // Wait to see if we get some more bytes to join in. prevChunk = chunk; @@ -1118,25 +1114,121 @@ describe('ReactFlightDOMEdge', () => { expect(streamedBuffers).toEqual(buffers); }); + it('should support binary ReadableStreams', async () => { + const encoder = new TextEncoder(); + const words = ['Hello', 'streaming', 'world']; + + const stream = new ReadableStream({ + type: 'bytes', + async start(controller) { + for (let i = 0; i < words.length; i++) { + const chunk = encoder.encode(words[i] + ' '); + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const rscStream = await serverAct(() => + ReactServerDOMServer.renderToReadableStream(stream, {}), + ); + + const result = await ReactServerDOMClient.createFromReadableStream( + rscStream, + { + serverConsumerManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + const reader = result.getReader(); + const decoder = new TextDecoder(); + + let text = ''; + let entry; + while (!(entry = await reader.read()).done) { + text += decoder.decode(entry.value); + } + + expect(text).toBe('Hello streaming world '); + }); + + it('should support large binary ReadableStreams', async () => { + const chunkCount = 100; + const chunkSize = 1024; + const expectedBytes = []; + + const stream = new ReadableStream({ + type: 'bytes', + start(controller) { + for (let i = 0; i < chunkCount; i++) { + const chunk = new Uint8Array(chunkSize); + for (let j = 0; j < chunkSize; j++) { + chunk[j] = (i + j) % 256; + } + expectedBytes.push(...Array.from(chunk)); + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const rscStream = await serverAct(() => + ReactServerDOMServer.renderToReadableStream(stream, {}), + ); + + const result = await ReactServerDOMClient.createFromReadableStream( + // Use passThrough to split and rejoin chunks at arbitrary boundaries. + passThrough(rscStream), + { + serverConsumerManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + const reader = result.getReader(); + const receivedBytes = []; + let entry; + while (!(entry = await reader.read()).done) { + expect(entry.value instanceof Uint8Array).toBe(true); + receivedBytes.push(...Array.from(entry.value)); + } + + expect(receivedBytes).toEqual(expectedBytes); + }); + it('should support BYOB binary ReadableStreams', async () => { - const buffer = new Uint8Array([ + const sourceBytes = [ 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, - ]).buffer; + ]; + + // Create separate buffers for each typed array to avoid ArrayBuffer + // transfer issues. Each view needs its own buffer because enqueue() + // transfers ownership. const buffers = [ - new Int8Array(buffer, 1), - new Uint8Array(buffer, 2), - new Uint8ClampedArray(buffer, 2), - new Int16Array(buffer, 2), - new Uint16Array(buffer, 2), - new Int32Array(buffer, 4), - new Uint32Array(buffer, 4), - new Float32Array(buffer, 4), - new Float64Array(buffer, 0), - new BigInt64Array(buffer, 0), - new BigUint64Array(buffer, 0), - new DataView(buffer, 3), + new Int8Array(sourceBytes.slice(1)), + new Uint8Array(sourceBytes.slice(2)), + new Uint8ClampedArray(sourceBytes.slice(2)), + new Int16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Uint16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Int32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Uint32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigInt64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigUint64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new DataView(new Uint8Array(sourceBytes.slice(3)).buffer), ]; + // Save expected bytes before enqueueing (which will detach the buffers). + const expectedBytes = buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ); + // This a binary stream where each chunk ends up as Uint8Array. const s = new ReadableStream({ type: 'bytes', @@ -1176,11 +1268,7 @@ describe('ReactFlightDOMEdge', () => { // The streamed buffers might be in different chunks and in Uint8Array form but // the concatenated bytes should be the same. - expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( - buffers.flatMap(c => - Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), - ), - ); + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(expectedBytes); }); // @gate !__DEV__ || enableComponentPerformanceTrack diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js index 7315e78c619..b874383dcdd 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js @@ -10,18 +10,6 @@ 'use strict'; -// Polyfills for test environment -global.ReadableStream = - require('web-streams-polyfill/ponyfill/es6').ReadableStream; -global.TextEncoder = require('util').TextEncoder; -global.TextDecoder = require('util').TextDecoder; - -global.Blob = require('buffer').Blob; -if (typeof File === 'undefined' || typeof FormData === 'undefined') { - global.File = require('buffer').File || require('undici').File; - global.FormData = require('undici').FormData; -} - let serverExports; let webpackServerMap; let ReactServerDOMServer; @@ -194,24 +182,33 @@ describe('ReactFlightDOMReplyEdge', () => { }); it('should support BYOB binary ReadableStreams', async () => { - const buffer = new Uint8Array([ + const sourceBytes = [ 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, - ]).buffer; + ]; + + // Create separate buffers for each typed array to avoid ArrayBuffer + // transfer issues. Each view needs its own buffer because enqueue() + // transfers ownership. const buffers = [ - new Int8Array(buffer, 1), - new Uint8Array(buffer, 2), - new Uint8ClampedArray(buffer, 2), - new Int16Array(buffer, 2), - new Uint16Array(buffer, 2), - new Int32Array(buffer, 4), - new Uint32Array(buffer, 4), - new Float32Array(buffer, 4), - new Float64Array(buffer, 0), - new BigInt64Array(buffer, 0), - new BigUint64Array(buffer, 0), - new DataView(buffer, 3), + new Int8Array(sourceBytes.slice(1)), + new Uint8Array(sourceBytes.slice(2)), + new Uint8ClampedArray(sourceBytes.slice(2)), + new Int16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Uint16Array(new Uint8Array(sourceBytes.slice(2)).buffer), + new Int32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Uint32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float32Array(new Uint8Array(sourceBytes.slice(4)).buffer), + new Float64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigInt64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new BigUint64Array(new Uint8Array(sourceBytes.slice(0)).buffer), + new DataView(new Uint8Array(sourceBytes.slice(3)).buffer), ]; + // Save expected bytes before enqueueing (which will detach the buffers). + const expectedBytes = buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ); + // This a binary stream where each chunk ends up as Uint8Array. const s = new ReadableStream({ type: 'bytes', @@ -239,11 +236,7 @@ describe('ReactFlightDOMReplyEdge', () => { // The streamed buffers might be in different chunks and in Uint8Array form but // the concatenated bytes should be the same. - expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( - buffers.flatMap(c => - Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), - ), - ); + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(expectedBytes); }); it('should abort when parsing an incomplete payload', async () => { diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 1f5e24ff596..f9729b535e0 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -1149,6 +1149,8 @@ function serializeReadableStream( supportsBYOB = false; } } + // At this point supportsBYOB is guaranteed to be a boolean. + const isByteStream: boolean = supportsBYOB; const reader = stream.getReader(); @@ -1172,7 +1174,7 @@ function serializeReadableStream( // The task represents the Stop row. This adds a Start row. request.pendingChunks++; const startStreamRow = - streamTask.id.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n'; + streamTask.id.toString(16) + ':' + (isByteStream ? 'r' : 'R') + '\n'; request.completedRegularChunks.push(stringToChunk(startStreamRow)); function progress(entry: {done: boolean, value: ReactClientValue, ...}) { @@ -1190,9 +1192,15 @@ function serializeReadableStream( callOnAllReadyIfReady(request); } else { try { - streamTask.model = entry.value; request.pendingChunks++; - tryStreamTask(request, streamTask); + streamTask.model = entry.value; + if (isByteStream) { + // Chunks of byte streams are always Uint8Array instances. + const chunk: Uint8Array = (streamTask.model: any); + emitTypedArrayChunk(request, streamTask.id, 'b', chunk, false); + } else { + tryStreamTask(request, streamTask); + } enqueueFlush(request); reader.read().then(progress, error); } catch (x) {