diff --git a/lib/internal/streams/duplexpair.js b/lib/internal/streams/duplexpair.js index a32084c4d4cbdf..b2484639cd1903 100644 --- a/lib/internal/streams/duplexpair.js +++ b/lib/internal/streams/duplexpair.js @@ -50,6 +50,30 @@ class DuplexSide extends Duplex { this.#otherSide.on('end', callback); this.#otherSide.push(null); } + + + _destroy(err, callback) { + const otherSide = this.#otherSide; + + if (otherSide !== null && !otherSide.destroyed) { + // Use nextTick to avoid crashing the current execution stack (like HTTP parser) + process.nextTick(() => { + if (otherSide.destroyed) return; + + if (err) { + // Destroy the other side, without passing the 'err' object. + // This closes the other side gracefully so it doesn't hang, + // but prevents the "Unhandled error" crash. + otherSide.destroy(); + } else { + // Standard graceful close + otherSide.push(null); + } + }); + } + + callback(err); + } } function duplexPair(options) { @@ -57,6 +81,6 @@ function duplexPair(options) { const side1 = new DuplexSide(options); side0[kInitOtherSide](side1); side1[kInitOtherSide](side0); - return [ side0, side1 ]; + return [side0, side1]; } module.exports = duplexPair; diff --git a/test/parallel/test-duplex-error.js b/test/parallel/test-duplex-error.js new file mode 100644 index 00000000000000..cacdb62d5fda9d --- /dev/null +++ b/test/parallel/test-duplex-error.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { duplexPair } = require('stream'); + +const [sideA, sideB] = duplexPair(); + +// Side A should receive the error because we called .destroy(err) on it. +sideA.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'Simulated error'); +})); + +// Side B should NOT necessarily emit an error (to avoid crashing +// existing code), but it MUST be destroyed. +sideB.on('error', common.mustNotCall('Side B should not emit an error event')); + +sideB.on('close', common.mustCall(() => { + assert.strictEqual(sideB.destroyed, true); +})); + +sideA.resume(); +sideB.resume(); + +// Trigger the destruction +sideA.destroy(new Error('Simulated error')); + +// Check the state in the next tick to allow nextTick/microtasks to run +setImmediate(common.mustCall(() => { + assert.strictEqual(sideA.destroyed, true); + assert.strictEqual(sideB.destroyed, true); +}));