Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -636,9 +636,14 @@ function howMuchToRead(n, state) {
if ((state[kState] & kObjectMode) !== 0)
return 1;
if (NumberIsNaN(n)) {
// Fast path for buffers.
if ((state[kState] & kDecoder) === 0 && state.length)
return state.buffer[state.bufferIndex].length;

// Only flow one buffer at a time.
if ((state[kState] & kFlowing) !== 0 && state.length)
return state.buffer[state.bufferIndex].length;

return state.length;
}
if (n <= state.length)
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-crypto-cipheriv-decipheriv.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ function testCipher1(key, iv) {
// quite small, so there's no harm.
const cStream = crypto.createCipheriv('des-ede3-cbc', key, iv);
cStream.end(plaintext);
ciph = cStream.read();
ciph = cStream.read(cStream.readableLength);

const dStream = crypto.createDecipheriv('des-ede3-cbc', key, iv);
dStream.end(ciph);
txt = dStream.read().toString('utf8');
txt = dStream.read(dStream.readableLength).toString('utf8');

assert.strictEqual(txt, plaintext,
`streaming cipher with key ${key} and iv ${iv}`);
Expand Down
34 changes: 16 additions & 18 deletions test/parallel/test-runner-run.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as fixtures from '../common/fixtures.mjs';
import { join } from 'node:path';
import { describe, it, run } from 'node:test';
import { dot, spec, tap } from 'node:test/reporters';
import consumers from 'node:stream/consumers';
import assert from 'node:assert';
import util from 'node:util';

Expand Down Expand Up @@ -111,34 +112,31 @@ describe('require(\'node:test\').run', { concurrency: true }, () => {
describe('should be piped with spec reporter', () => {
it('new spec', async () => {
const specReporter = new spec();
const result = await run({
const result = await consumers.text(run({
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
}).compose(specReporter).toArray();
const stringResults = result.map((bfr) => bfr.toString());
assert.match(stringResults[0], /this should pass/);
assert.match(stringResults[1], /tests 1/);
assert.match(stringResults[1], /pass 1/);
}).compose(specReporter));
assert.match(result, /this should pass/);
assert.match(result, /tests 1/);
assert.match(result, /pass 1/);
});

it('spec()', async () => {
const specReporter = spec();
const result = await run({
const result = await consumers.text(run({
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
}).compose(specReporter).toArray();
const stringResults = result.map((bfr) => bfr.toString());
assert.match(stringResults[0], /this should pass/);
assert.match(stringResults[1], /tests 1/);
assert.match(stringResults[1], /pass 1/);
}).compose(specReporter));
assert.match(result, /this should pass/);
assert.match(result, /tests 1/);
assert.match(result, /pass 1/);
});

it('spec', async () => {
const result = await run({
const result = await consumers.text(run({
files: [join(testFixtures, 'default-behavior/test/random.cjs')]
}).compose(spec).toArray();
const stringResults = result.map((bfr) => bfr.toString());
assert.match(stringResults[0], /this should pass/);
assert.match(stringResults[1], /tests 1/);
assert.match(stringResults[1], /pass 1/);
}).compose(spec));
assert.match(result, /this should pass/);
assert.match(result, /tests 1/);
assert.match(result, /pass 1/);
});
});

Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ const assert = require('assert');

newStream.end();

assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]);
assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve Rogers'), Buffer.from('On your left')]);
})().then(common.mustCall());
}

Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-push-strings.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ ms.on('readable', function() {
results.push(String(chunk));
});

const expect = [ 'first chunksecond to last chunk', 'last chunk' ];
const expect = [ 'first chunk', 'second to last chunk', 'last chunk' ];
process.on('exit', function() {
assert.strictEqual(ms._chunks, -1);
assert.deepStrictEqual(results, expect);
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-readable-emittedReadable.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const readable = new Readable({
// Initialized to false.
assert.strictEqual(readable._readableState.emittedReadable, false);

const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
const expected = [Buffer.from('foo'), Buffer.from('bar'), Buffer.from('quo'), null];
readable.on('readable', common.mustCall(() => {
// emittedReadable should be true when the readable event is emitted
assert.strictEqual(readable._readableState.emittedReadable, true);
Expand Down
14 changes: 3 additions & 11 deletions test/parallel/test-stream-readable-infinite-read.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,16 @@ const readable = new Readable({
highWaterMark: 16 * 1024,
read: common.mustCall(function() {
this.push(buf);
}, 31)
}, 12)
});

let i = 0;

readable.on('readable', common.mustCall(function() {
if (i++ === 10) {
// We will just terminate now.
process.removeAllListeners('readable');
readable.removeAllListeners('readable');
return;
}

const data = readable.read();
// TODO(mcollina): there is something odd in the highWaterMark logic
// investigate.
if (i === 1) {
assert.strictEqual(data.length, 8192 * 2);
} else {
assert.strictEqual(data.length, 8192 * 3);
}
assert.strictEqual(readable.read().length, 8192);
}, 11));
2 changes: 1 addition & 1 deletion test/parallel/test-stream-readable-needReadable.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const asyncReadable = new Readable({
});

asyncReadable.on('readable', common.mustCall(() => {
if (asyncReadable.read() !== null) {
if (asyncReadable.read(asyncReadable.readableLength) !== null) {
// After each read(), the buffer is empty.
// If the stream doesn't end now,
// then we need to notify the reader on future changes.
Expand Down
20 changes: 20 additions & 0 deletions test/parallel/test-stream-readable-readable-one.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
'use strict';
const common = require('../common');

Check failure on line 2 in test/parallel/test-stream-readable-readable-one.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

'common' is assigned a value but never used
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const common = require('../common');
require('../common');

I would also rename the file to something like test-stream-readable-read-one.js

const assert = require('assert');

const { Readable } = require('stream');

// Read one buffer at a time and don't waste cycles allocating
// and copying into a new larger buffer.
{
const r = new Readable({
read() {}
});
const buffers = [Buffer.allocUnsafe(5), Buffer.allocUnsafe(10)];
for (const buf of buffers) {
r.push(buf);
}
for (const buf of buffers) {
assert.strictEqual(r.read(), buf);
}
}
14 changes: 12 additions & 2 deletions test/parallel/test-stream-typedarray.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,19 @@ const views = common.getArrayBufferViews(buffer);
readable.push(views[2]);
readable.unshift(views[0]);

const buf = readable.read();
let buf;

buf = readable.read();
assert(buf instanceof Buffer);
assert.deepStrictEqual([...buf], [...views[0]]);

buf = readable.read();
assert(buf instanceof Buffer);
assert.deepStrictEqual([...buf], [...views[1]]);

buf = readable.read();
assert(buf instanceof Buffer);
assert.deepStrictEqual([...buf], [...views[0], ...views[1], ...views[2]]);
assert.deepStrictEqual([...buf], [...views[2]]);
}

{
Expand Down
10 changes: 8 additions & 2 deletions test/parallel/test-stream-uint8array.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,15 @@ const GHI = new Uint8Array([0x47, 0x48, 0x49]);
readable.push(DEF);
readable.unshift(ABC);

const buf = readable.read();
let buf;

buf = readable.read();
assert(buf instanceof Buffer);
assert.deepStrictEqual([...buf], [...ABC]);

buf = readable.read();
assert(buf instanceof Buffer);
assert.deepStrictEqual([...buf], [...ABC, ...DEF]);
assert.deepStrictEqual([...buf], [...DEF]);
}

{
Expand Down
4 changes: 3 additions & 1 deletion test/parallel/test-stream2-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ const { PassThrough, Transform } = require('stream');
pt.write(Buffer.from('ef'), common.mustCall(function() {
pt.end();
}));
assert.strictEqual(pt.read().toString(), 'abcdef');
assert.strictEqual(pt.read().toString(), 'abc');
assert.strictEqual(pt.read().toString(), 'd');
assert.strictEqual(pt.read().toString(), 'ef');
assert.strictEqual(pt.read(), null);
});
});
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-webstreams-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ const http = require('http');
});

pipeline(r, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['helloworld']);
assert.deepStrictEqual(values, ['hello', 'world']);
}));

r.push('hello');
Expand Down Expand Up @@ -181,7 +181,7 @@ const http = require('http');
});

pipeline(rs, t, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLOWORLD']);
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
}));

c.enqueue('hello');
Expand Down
3 changes: 2 additions & 1 deletion test/parallel/test-worker-heap-snapshot.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ const { once } = require('events');
const worker = new Worker('setInterval(() => {}, 1000);', { eval: true });
await once(worker, 'online');
const stream = await worker.getHeapSnapshot();
assert.ok(JSON.parse(stream.read()));
stream.read(0); // Trigger the stream to start flowing
assert.ok(JSON.parse(stream.read(stream.readableLength)));

await worker.terminate();
})().then(common.mustCall());
2 changes: 1 addition & 1 deletion test/parallel/test-worker-stdio-from-preload-module.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ for (let i = 0; i < 10; i++) {
stdout: true
});
w.on('exit', common.mustCall(() => {
assert.strictEqual(w.stdout.read().toString(), 'A\nB\n');
assert.strictEqual(w.stdout.read(w.stdout.readableLength).toString(), 'A\nB\n');
}));
}
4 changes: 2 additions & 2 deletions test/parallel/test-zlib-flush-write-sync-interleaved.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ for (const chunk of ['abc', 'def', 'ghi']) {
compress.write(chunk, common.mustCall(() => events.push({ written: chunk })));
compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => {
events.push('flushed');
const chunk = compress.read();
const chunk = compress.read(compress.readableLength);
if (chunk !== null)
compressedChunks.push(chunk);
}));
Expand All @@ -36,7 +36,7 @@ function writeToDecompress() {
const chunk = compressedChunks.shift();
if (chunk === undefined) return decompress.end();
decompress.write(chunk, common.mustCall(() => {
events.push({ read: decompress.read() });
events.push({ read: decompress.read(decompress.readableLength) });
writeToDecompress();
}));
}
Expand Down
Loading