Skip to content

Commit ada6916

Browse files
committed
stream: sync resume on drain
Inside drain context we are already async and don't need to defer resume.
1 parent 2030fd3 commit ada6916

File tree

2 files changed

+38
-11
lines changed

2 files changed

+38
-11
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Readable, Writable } = require('stream');
5+
6+
const bench = common.createBenchmark(main, {
7+
n: [5e6],
8+
});
9+
10+
function main({ n }) {
11+
const b = Buffer.alloc(1024);
12+
const r = new Readable();
13+
const w = new Writable();
14+
15+
let i = 0;
16+
17+
r._read = () => r.push(i++ === n ? null : b);
18+
w._write = (data, enc, cb) => queueMicrotask(cb);
19+
20+
bench.start();
21+
22+
r.pipe(w);
23+
w.on('finish', () => bench.end(n));
24+
}

lib/internal/streams/readable.js

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ function pipeOnDrain(src, dest) {
10911091

10921092
if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
10931093
(state[kState] & kDataListening) !== 0) {
1094-
src.resume();
1094+
resume(stream, state, dest.listenerCount('drain') <= 1);
10951095
}
10961096
};
10971097
}
@@ -1232,7 +1232,10 @@ function nReadingNextTick(self) {
12321232
// pause() and resume() are remnants of the legacy readable stream API
12331233
// If the user uses them, then switch into old mode.
12341234
Readable.prototype.resume = function() {
1235-
const state = this._readableState;
1235+
return resume(this, this._readableState, false);
1236+
};
1237+
1238+
function resume(stream, state, sync) {
12361239
if ((state[kState] & kFlowing) === 0) {
12371240
debug('resume');
12381241
// We flow only if there is no one listening
@@ -1244,18 +1247,18 @@ Readable.prototype.resume = function() {
12441247
} else {
12451248
state[kState] &= ~kFlowing;
12461249
}
1247-
resume(this, state);
1250+
if ((state[kState] & kResumeScheduled) === 0) {
1251+
if (sync) {
1252+
resume_(stream, state, false);
1253+
} else {
1254+
state[kState] |= kResumeScheduled;
1255+
process.nextTick(resume_, stream, state);
1256+
}
1257+
}
12481258
}
12491259
state[kState] |= kHasPaused;
12501260
state[kState] &= ~kPaused;
1251-
return this;
1252-
};
1253-
1254-
function resume(stream, state) {
1255-
if ((state[kState] & kResumeScheduled) === 0) {
1256-
state[kState] |= kResumeScheduled;
1257-
process.nextTick(resume_, stream, state);
1258-
}
1261+
return stream;
12591262
}
12601263

12611264
function resume_(stream, state) {

0 commit comments

Comments
 (0)