From 4745f89ef1dd8fd1cad6e199e105c804d46aaad4 Mon Sep 17 00:00:00 2001 From: Dimitris Ilias Date: Thu, 7 May 2026 16:56:31 +0300 Subject: [PATCH] prevent double-stringify of dlq messages on catch path --- test/base-queue-handler.test.ts | 32 +++++++++++++++++++++----------- ts/base-queue-handler.ts | 6 +++--- ts/encode-decode.ts | 3 +++ 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/test/base-queue-handler.test.ts b/test/base-queue-handler.test.ts index d1d6528..584517d 100644 --- a/test/base-queue-handler.test.ts +++ b/test/base-queue-handler.test.ts @@ -183,12 +183,12 @@ describe('Test baseQueueHandler', function () { DemoHandler.prototype.afterDlq = originalAfterDlq; }); - it('should add string to dlq because afterDlq throws error', async function () { - sandbox.useFakeTimers(); + it('should add raw buffer to dlq when afterDlq throws to prevent double-encoding', async function () { const handler = new DemoHandler(this.name, rabbit, { retries: 2, - retryDelay: 100 + retryDelay: 10 }); + await handler.created; handler.handle = sandbox.spy(() => { throw new Error('test error'); }); @@ -198,18 +198,28 @@ describe('Test baseQueueHandler', function () { throw new Error('test error'); } })); + + const dlqMessages: any[] = []; + await rabbit.subscribe(this.name + '_dlq', (msg, ack) => { + dlqMessages.push({ event: JSON.parse(msg.content.toString()), content: msg.content }); + ack(null); + }); + await rabbit.publish(this.name, { test: 'data' }, { correlationId: '3' }); const publish = (handler.rabbit.publish = sandbox.spy(handler.rabbit, 'publish')); - sandbox.clock.tick(100); - await rabbit.connected; - sandbox.clock.tick(100); - await rabbit.connected; - sandbox.clock.tick(100); - sandbox.clock.restore(); - await new Promise(resolve => setTimeout(resolve, 400)); + await new Promise(resolve => setTimeout(resolve, 300)); + afterDlq.calledOnce.should.be.true(); publish.calledTwice.should.be.true(); - publish.args[publish.callCount - 1][1].should.eql('{"test":"data"}'); + const fallbackPayload = publish.args[publish.callCount - 1][1]; + Buffer.isBuffer(fallbackPayload).should.be.true(); + fallbackPayload.toString().should.eql('{"test":"data"}'); + + // TODO: This flow also produces duplicate messages on the DLQ, + // to be investigated and handled on the next PR + dlqMessages.length.should.equal(2); + dlqMessages[0].event.should.eql({ test: 'data' }); + dlqMessages[1].event.should.eql({ test: 'data' }); }); it('should add to dlq after x retries and get error response even if afterDlq throws error', async function () { diff --git a/ts/base-queue-handler.ts b/ts/base-queue-handler.ts index acdce54..68c4fcd 100644 --- a/ts/base-queue-handler.ts +++ b/ts/base-queue-handler.ts @@ -182,16 +182,16 @@ abstract class BaseQueueHandler { } async addToDLQ(retries, msg: amqp.Message, ack) { + const correlationId = this.getCorrelationId(msg); try { - const correlationId = this.getCorrelationId(msg); const event = decode(msg); this.logger.warn(`[${correlationId}] Adding to dlq: ${this.dlqName} after ${retries} retries`); await this.rabbit.publish(this.dlqName, event, msg.properties); const response = await this.afterDlq({ msg, event }); ack(msg.properties.headers.errors.message, response); } catch (err) { - this.logger.error(err); - await this.rabbit.publish(this.dlqName, msg.content.toString(), msg.properties); + this.logger.error(`[${correlationId}] Failed to add to dlq: ${this.dlqName}`, err); + await this.rabbit.publish(this.dlqName, msg.content, msg.properties); ack(err.message, null); } } diff --git a/ts/encode-decode.ts b/ts/encode-decode.ts index d1430b3..8b8c8f4 100644 --- a/ts/encode-decode.ts +++ b/ts/encode-decode.ts @@ -1,4 +1,7 @@ export function encode(message: Buffer | string | Object = '', contentType = 'application/json') { + if (Buffer.isBuffer(message)) { + return message; + } if (contentType === 'application/json') { return Buffer.from(JSON.stringify(message)); }