Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions benchmark/webstreams/readable-read-buffered.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';
const common = require('../common.js');
const { ReadableStream } = require('node:stream/web');

// Benchmark for reading from a pre-buffered ReadableStream.
// This measures the fast path optimization where data is already
// queued in the controller, avoiding DefaultReadRequest allocation.

const bench = common.createBenchmark(main, {
n: [1e5],
bufferSize: [1, 10, 100, 1000],
});

async function main({ n, bufferSize }) {
let enqueued = 0;

const rs = new ReadableStream({
start(controller) {
// Pre-fill the buffer
for (let i = 0; i < bufferSize; i++) {
controller.enqueue('a');
enqueued++;
}
},
pull(controller) {
// Refill buffer when pulled
const toEnqueue = Math.min(bufferSize, n - enqueued);
for (let i = 0; i < toEnqueue; i++) {
controller.enqueue('a');
enqueued++;
}
if (enqueued >= n) {
controller.close();
}
},
}, {
// Use buffer size as high water mark to allow pre-buffering
highWaterMark: bufferSize,
});

const reader = rs.getReader();
let x = null;
let reads = 0;

bench.start();
while (reads < n) {
const { value, done } = await reader.read();
if (done) break;
x = value;
reads++;
}
bench.end(reads);
console.assert(x);
}
83 changes: 83 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,31 @@ class ReadableStreamDefaultReader {
new ERR_INVALID_STATE.TypeError(
'The reader is not attached to a stream'));
}

const stream = this[kState].stream;
const controller = stream[kState].controller;

// Fast path: if data is already buffered in a default controller,
// return a resolved promise immediately without creating a read request.
// This is spec-compliant because read() returns a Promise, and
// Promise.resolve() callbacks still run in the microtask queue.
if (stream[kState].state === 'readable' &&
isReadableStreamDefaultController(controller) &&
controller[kState].queue.length > 0) {
stream[kState].disturbed = true;
const chunk = dequeueValue(controller);

if (controller[kState].closeRequested && !controller[kState].queue.length) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
} else {
readableStreamDefaultControllerCallPullIfNeeded(controller);
}

return PromiseResolve({ value: chunk, done: false });
}

// Slow path: create request and go through normal flow
const readRequest = new DefaultReadRequest();
readableStreamDefaultReaderRead(this, readRequest);
return readRequest.promise;
Expand Down Expand Up @@ -1286,6 +1311,8 @@ const isReadableStream =
isBrandCheck('ReadableStream');
const isReadableByteStreamController =
isBrandCheck('ReadableByteStreamController');
const isReadableStreamDefaultController =
isBrandCheck('ReadableStreamDefaultController');
const isReadableStreamBYOBRequest =
isBrandCheck('ReadableStreamBYOBRequest');
const isReadableStreamDefaultReader =
Expand Down Expand Up @@ -1510,6 +1537,62 @@ function readableStreamPipeTo(

await writer[kState].ready.promise;

const controller = source[kState].controller;

// Fast path: batch reads when data is buffered in a default controller.
// This avoids creating PipeToReadableStreamReadRequest objects and
// reduces promise allocation overhead.
if (source[kState].state === 'readable' &&
isReadableStreamDefaultController(controller) &&
controller[kState].queue.length > 0) {
let batchCount = 0;
const hwm = controller[kState].highWaterMark || 1;

while (controller[kState].queue.length > 0 && batchCount < hwm) {
if (shuttingDown) return true;

source[kState].disturbed = true;
const chunk = dequeueValue(controller);

if (controller[kState].closeRequested && !controller[kState].queue.length) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(source);
}

// Write the chunk - we're already in a separate microtask from enqueue
// because we awaited writer[kState].ready.promise above
state.currentWrite = writableStreamDefaultWriterWrite(writer, chunk);
setPromiseHandled(state.currentWrite);

batchCount++;

// Check backpressure after each write
if (writer[kState].ready.promise !== undefined &&
dest[kState].state === 'writable') {
const desiredSize = writer.desiredSize;
if (desiredSize !== null && desiredSize <= 0) {
// Backpressure - stop batch and wait for ready
break;
}
}
}

// Trigger pull if needed after batch
if (source[kState].state === 'readable' &&
!controller[kState].closeRequested) {
readableStreamDefaultControllerCallPullIfNeeded(controller);
}

// Check if stream closed during batch
if (source[kState].state === 'closed') {
return true;
}

// Yield to microtask queue between batches to allow events/signals to fire
return false;
}

// Slow path: use read request for async reads
const promise = PromiseWithResolvers();
// eslint-disable-next-line no-use-before-define
readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise));
Expand Down
Loading