Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions test/base-queue-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ describe('Test baseQueueHandler', function () {
DemoHandler.prototype.afterDlq = originalAfterDlq;
});

it('should add string to dlq because afterDlq throws error', async function () {
sandbox.useFakeTimers();
it('should add raw buffer to dlq when afterDlq throws to prevent double-encoding', async function () {
const handler = new DemoHandler(this.name, rabbit, {
retries: 2,
retryDelay: 100
retryDelay: 10
});
await handler.created;
handler.handle = sandbox.spy(() => {
throw new Error('test error');
});
Expand All @@ -198,18 +198,28 @@ describe('Test baseQueueHandler', function () {
throw new Error('test error');
}
}));

const dlqMessages: any[] = [];
await rabbit.subscribe(this.name + '_dlq', (msg, ack) => {
dlqMessages.push({ event: JSON.parse(msg.content.toString()), content: msg.content });
ack(null);
});

await rabbit.publish(this.name, { test: 'data' }, { correlationId: '3' });
const publish = (handler.rabbit.publish = sandbox.spy(handler.rabbit, 'publish'));
sandbox.clock.tick(100);
await rabbit.connected;
sandbox.clock.tick(100);
await rabbit.connected;
sandbox.clock.tick(100);
sandbox.clock.restore();
await new Promise(resolve => setTimeout(resolve, 400));
await new Promise(resolve => setTimeout(resolve, 300));

afterDlq.calledOnce.should.be.true();
publish.calledTwice.should.be.true();
publish.args[publish.callCount - 1][1].should.eql('{"test":"data"}');
const fallbackPayload = publish.args[publish.callCount - 1][1];
Buffer.isBuffer(fallbackPayload).should.be.true();
fallbackPayload.toString().should.eql('{"test":"data"}');

// TODO: This flow also produces duplicate messages on the DLQ,
// to be investigated and handled on the next PR
dlqMessages.length.should.equal(2);
dlqMessages[0].event.should.eql({ test: 'data' });
dlqMessages[1].event.should.eql({ test: 'data' });
});

it('should add to dlq after x retries and get error response even if afterDlq throws error', async function () {
Expand Down
6 changes: 3 additions & 3 deletions ts/base-queue-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,16 @@ abstract class BaseQueueHandler {
}

async addToDLQ(retries, msg: amqp.Message, ack) {
const correlationId = this.getCorrelationId(msg);
try {
const correlationId = this.getCorrelationId(msg);
const event = decode(msg);
this.logger.warn(`[${correlationId}] Adding to dlq: ${this.dlqName} after ${retries} retries`);
await this.rabbit.publish(this.dlqName, event, msg.properties);
const response = await this.afterDlq({ msg, event });
ack(msg.properties.headers.errors.message, response);
} catch (err) {
this.logger.error(err);
await this.rabbit.publish(this.dlqName, msg.content.toString(), msg.properties);
this.logger.error(`[${correlationId}] Failed to add to dlq: ${this.dlqName}`, err);
await this.rabbit.publish(this.dlqName, msg.content, msg.properties);
ack(err.message, null);
}
}
Expand Down
3 changes: 3 additions & 0 deletions ts/encode-decode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
export function encode(message: Buffer | string | Object = '', contentType = 'application/json') {
if (Buffer.isBuffer(message)) {
return message;
}
if (contentType === 'application/json') {
return Buffer.from(JSON.stringify(message));
}
Expand Down