Skip to content

Commit 199daab

Browse files
mcollinacramforce
andauthored
stream: add fast paths for webstreams read and pipeTo
Add internal fast paths to improve webstreams performance without changing the public API or breaking spec compliance. 1. ReadableStreamDefaultReader.read() fast path: When data is already buffered in the controller's queue, return PromiseResolve() directly without creating a DefaultReadRequest object. This is spec-compliant because read() returns a Promise, and resolved promises still run callbacks in the microtask queue. 2. pipeTo() batch read fast path: When data is buffered, batch reads directly from the controller queue up to highWaterMark without creating PipeToReadableStreamReadRequest objects per chunk. Respects backpressure by checking desiredSize after each write. Benchmark results: - pipeTo: ~11% faster (***) - buffered read(): ~17-20% faster (***) Co-Authored-By: Malte Ubl <malte@vercel.com> PR-URL: #61807 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Gürgün Dayıoğlu <hey@gurgun.day> Reviewed-By: Ethan Arrowood <ethan@arrowood.dev>
1 parent 5a8f845 commit 199daab

File tree

2 files changed

+132
-0
lines changed

2 files changed

+132
-0
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
'use strict';
2+
const common = require('../common.js');
3+
const { ReadableStream } = require('node:stream/web');
4+
5+
// Benchmark for reading from a pre-buffered ReadableStream.
6+
// This measures the fast path optimization where data is already
7+
// queued in the controller, avoiding DefaultReadRequest allocation.
8+
9+
const bench = common.createBenchmark(main, {
10+
n: [1e5],
11+
bufferSize: [1, 10, 100, 1000],
12+
});
13+
14+
async function main({ n, bufferSize }) {
15+
let enqueued = 0;
16+
17+
const rs = new ReadableStream({
18+
start(controller) {
19+
// Pre-fill the buffer
20+
for (let i = 0; i < bufferSize; i++) {
21+
controller.enqueue('a');
22+
enqueued++;
23+
}
24+
},
25+
pull(controller) {
26+
// Refill buffer when pulled
27+
const toEnqueue = Math.min(bufferSize, n - enqueued);
28+
for (let i = 0; i < toEnqueue; i++) {
29+
controller.enqueue('a');
30+
enqueued++;
31+
}
32+
if (enqueued >= n) {
33+
controller.close();
34+
}
35+
},
36+
}, {
37+
// Use buffer size as high water mark to allow pre-buffering
38+
highWaterMark: bufferSize,
39+
});
40+
41+
const reader = rs.getReader();
42+
let x = null;
43+
let reads = 0;
44+
45+
bench.start();
46+
while (reads < n) {
47+
const { value, done } = await reader.read();
48+
if (done) break;
49+
x = value;
50+
reads++;
51+
}
52+
bench.end(reads);
53+
console.assert(x);
54+
}

lib/internal/webstreams/readablestream.js

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,31 @@ class ReadableStreamDefaultReader {
860860
new ERR_INVALID_STATE.TypeError(
861861
'The reader is not attached to a stream'));
862862
}
863+
864+
const stream = this[kState].stream;
865+
const controller = stream[kState].controller;
866+
867+
// Fast path: if data is already buffered in a default controller,
868+
// return a resolved promise immediately without creating a read request.
869+
// This is spec-compliant because read() returns a Promise, and
870+
// Promise.resolve() callbacks still run in the microtask queue.
871+
if (stream[kState].state === 'readable' &&
872+
isReadableStreamDefaultController(controller) &&
873+
controller[kState].queue.length > 0) {
874+
stream[kState].disturbed = true;
875+
const chunk = dequeueValue(controller);
876+
877+
if (controller[kState].closeRequested && !controller[kState].queue.length) {
878+
readableStreamDefaultControllerClearAlgorithms(controller);
879+
readableStreamClose(stream);
880+
} else {
881+
readableStreamDefaultControllerCallPullIfNeeded(controller);
882+
}
883+
884+
return PromiseResolve({ value: chunk, done: false });
885+
}
886+
887+
// Slow path: create request and go through normal flow
863888
const readRequest = new DefaultReadRequest();
864889
readableStreamDefaultReaderRead(this, readRequest);
865890
return readRequest.promise;
@@ -1286,6 +1311,8 @@ const isReadableStream =
12861311
isBrandCheck('ReadableStream');
12871312
const isReadableByteStreamController =
12881313
isBrandCheck('ReadableByteStreamController');
1314+
const isReadableStreamDefaultController =
1315+
isBrandCheck('ReadableStreamDefaultController');
12891316
const isReadableStreamBYOBRequest =
12901317
isBrandCheck('ReadableStreamBYOBRequest');
12911318
const isReadableStreamDefaultReader =
@@ -1510,6 +1537,57 @@ function readableStreamPipeTo(
15101537

15111538
await writer[kState].ready.promise;
15121539

1540+
const controller = source[kState].controller;
1541+
1542+
// Fast path: batch reads when data is buffered in a default controller.
1543+
// This avoids creating PipeToReadableStreamReadRequest objects and
1544+
// reduces promise allocation overhead.
1545+
if (source[kState].state === 'readable' &&
1546+
isReadableStreamDefaultController(controller) &&
1547+
controller[kState].queue.length > 0) {
1548+
1549+
while (controller[kState].queue.length > 0) {
1550+
if (shuttingDown) return true;
1551+
1552+
source[kState].disturbed = true;
1553+
const chunk = dequeueValue(controller);
1554+
1555+
if (controller[kState].closeRequested && !controller[kState].queue.length) {
1556+
readableStreamDefaultControllerClearAlgorithms(controller);
1557+
readableStreamClose(source);
1558+
}
1559+
1560+
// Write the chunk - we're already in a separate microtask from enqueue
1561+
// because we awaited writer[kState].ready.promise above
1562+
state.currentWrite = writableStreamDefaultWriterWrite(writer, chunk);
1563+
setPromiseHandled(state.currentWrite);
1564+
1565+
// Check backpressure after each write
1566+
if (dest[kState].state === 'writable') {
1567+
const desiredSize = writer.desiredSize;
1568+
if (desiredSize !== null && desiredSize <= 0) {
1569+
// Backpressure - stop batch and wait for ready
1570+
break;
1571+
}
1572+
}
1573+
}
1574+
1575+
// Trigger pull if needed after batch
1576+
if (source[kState].state === 'readable' &&
1577+
!controller[kState].closeRequested) {
1578+
readableStreamDefaultControllerCallPullIfNeeded(controller);
1579+
}
1580+
1581+
// Check if stream closed during batch
1582+
if (source[kState].state === 'closed') {
1583+
return true;
1584+
}
1585+
1586+
// Yield to microtask queue between batches to allow events/signals to fire
1587+
return false;
1588+
}
1589+
1590+
// Slow path: use read request for async reads
15131591
const promise = PromiseWithResolvers();
15141592
// eslint-disable-next-line no-use-before-define
15151593
readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise));

0 commit comments

Comments
 (0)