Skip to content

Commit 337420c

Browse files
committed
test: add release/reclaim dispatch index tests and legacy migration tests
Tests that release (retry) and reclaim (visibility timeout) correctly update dispatch indexes instead of master queue. Also tests that legacy pre-deploy messages in the old master queue migrate to the new dispatch index when they get reclaimed or retried.
1 parent 6052e63 commit 337420c

File tree

1 file changed

+247
-0
lines changed

1 file changed

+247
-0
lines changed

packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
FairQueue,
77
DefaultFairQueueKeyProducer,
88
DRRScheduler,
9+
ExponentialBackoffRetry,
910
WorkerQueueManager,
1011
} from "../index.js";
1112
import type { FairQueueKeyProducer, StoredMessage } from "../types.js";
@@ -38,6 +39,9 @@ class TestHelper {
3839
shardCount?: number;
3940
consumerIntervalMs?: number;
4041
concurrencyLimit?: number;
42+
visibilityTimeoutMs?: number;
43+
reclaimIntervalMs?: number;
44+
retry?: { maxAttempts: number; delayMs: number };
4145
} = {}
4246
) {
4347
const scheduler = new DRRScheduler({
@@ -54,8 +58,20 @@ class TestHelper {
5458
payloadSchema: TestPayloadSchema,
5559
shardCount: options.shardCount ?? 1,
5660
consumerIntervalMs: options.consumerIntervalMs ?? 20,
61+
visibilityTimeoutMs: options.visibilityTimeoutMs,
62+
reclaimIntervalMs: options.reclaimIntervalMs,
5763
startConsumers: false,
5864
workerQueue: { resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID },
65+
retry: options.retry
66+
? {
67+
strategy: new ExponentialBackoffRetry({
68+
maxAttempts: options.retry.maxAttempts,
69+
baseDelay: options.retry.delayMs,
70+
maxDelay: options.retry.delayMs,
71+
factor: 1,
72+
}),
73+
}
74+
: undefined,
5975
concurrencyGroups: options.concurrencyLimit
6076
? [
6177
{
@@ -597,6 +613,237 @@ describe("Two-Level Tenant Dispatch", () => {
597613
}
598614
);
599615
});
616+
describe("release updates dispatch indexes", () => {
617+
redisTest(
618+
"should update dispatch indexes when message is released for retry",
619+
{ timeout: 20000 },
620+
async ({ redisOptions }) => {
621+
const keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
622+
const redis = createRedisClient(redisOptions);
623+
const attempts: number[] = [];
624+
625+
const helper = new TestHelper(redisOptions, keys, {
626+
retry: { maxAttempts: 3, delayMs: 100 },
627+
});
628+
629+
await helper.fairQueue.enqueue({
630+
queueId: "tenant:t1:queue:q1",
631+
tenantId: "t1",
632+
payload: { value: "retry-me" },
633+
});
634+
635+
helper.onMessage(async (ctx) => {
636+
attempts.push(ctx.message.attempt);
637+
if (ctx.message.attempt < 2) {
638+
// Fail on first attempt to trigger retry
639+
await ctx.fail(new Error("transient error"));
640+
} else {
641+
await ctx.complete();
642+
}
643+
});
644+
helper.start();
645+
646+
// Wait for successful processing on second attempt
647+
await waitFor(() => attempts.length >= 2 && attempts.includes(2), 10000);
648+
649+
// After retry, the message went back into the queue via releaseMessage Lua.
650+
// Verify it was picked up again (attempt 2 processed).
651+
expect(attempts).toContain(1);
652+
expect(attempts).toContain(2);
653+
654+
// After completion, dispatch indexes should be cleaned up
655+
await waitFor(async () => {
656+
const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1"));
657+
return t1Queues === 0;
658+
}, 5000);
659+
660+
await helper.close();
661+
await redis.quit();
662+
}
663+
);
664+
});
665+
666+
describe("reclaim updates dispatch indexes", () => {
667+
redisTest(
668+
"should update dispatch indexes when timed-out message is reclaimed",
669+
{ timeout: 20000 },
670+
async ({ redisOptions }) => {
671+
const keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
672+
const redis = createRedisClient(redisOptions);
673+
const processCount = { count: 0 };
674+
675+
const helper = new TestHelper(redisOptions, keys, {
676+
visibilityTimeoutMs: 500,
677+
reclaimIntervalMs: 200,
678+
});
679+
680+
await helper.fairQueue.enqueue({
681+
queueId: "tenant:t1:queue:q1",
682+
tenantId: "t1",
683+
payload: { value: "reclaim-me" },
684+
});
685+
686+
helper.onMessage(async (ctx) => {
687+
processCount.count++;
688+
if (processCount.count === 1) {
689+
// First attempt: don't complete, let it timeout and get reclaimed
690+
await new Promise((resolve) => setTimeout(resolve, 1500));
691+
} else {
692+
// Second attempt after reclaim: complete normally
693+
await ctx.complete();
694+
}
695+
});
696+
helper.start();
697+
698+
// Wait for message to be processed twice (once timeout, once success)
699+
await waitFor(() => processCount.count >= 2, 10000);
700+
701+
// After reclaim + re-processing + completion, indexes should be clean
702+
await waitFor(async () => {
703+
const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1"));
704+
return t1Queues === 0;
705+
}, 5000);
706+
707+
await helper.close();
708+
await redis.quit();
709+
}
710+
);
711+
});
712+
713+
describe("legacy message migration via reclaim", () => {
714+
redisTest(
715+
"should migrate legacy master queue message to dispatch index on reclaim",
716+
{ timeout: 20000 },
717+
async ({ redisOptions }) => {
718+
const keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
719+
const redis = createRedisClient(redisOptions);
720+
const processed: string[] = [];
721+
722+
// Simulate pre-deploy: write message to old master queue + queue storage
723+
const queueId = "tenant:t1:queue:legacy";
724+
const queueKey = keys.queueKey(queueId);
725+
const queueItemsKey = keys.queueItemsKey(queueId);
726+
const masterQueueKey = keys.masterQueueKey(0);
727+
728+
const timestamp = Date.now();
729+
const storedMessage: StoredMessage<TestPayload> = {
730+
id: "legacy-reclaim-1",
731+
queueId,
732+
tenantId: "t1",
733+
payload: { value: "legacy-reclaim" },
734+
timestamp,
735+
attempt: 1,
736+
};
737+
738+
await redis.zadd(queueKey, timestamp, "legacy-reclaim-1");
739+
await redis.hset(queueItemsKey, "legacy-reclaim-1", JSON.stringify(storedMessage));
740+
await redis.zadd(masterQueueKey, timestamp, queueId);
741+
742+
// Verify: message only in old master queue, not in dispatch
743+
expect(await redis.zcard(keys.dispatchKey(0))).toBe(0);
744+
expect(await redis.zcard(keys.tenantQueueIndexKey("t1"))).toBe(0);
745+
746+
// Create FairQueue with short visibility timeout
747+
const helper = new TestHelper(redisOptions, keys, {
748+
visibilityTimeoutMs: 500,
749+
reclaimIntervalMs: 200,
750+
});
751+
752+
const processCount = { count: 0 };
753+
helper.onMessage(async (ctx) => {
754+
processCount.count++;
755+
if (processCount.count === 1) {
756+
// First attempt: don't complete, let it timeout
757+
// The reclaim will put it back in queue and update dispatch indexes
758+
await new Promise((resolve) => setTimeout(resolve, 1500));
759+
} else {
760+
// Second attempt: complete
761+
processed.push(ctx.message.payload.value);
762+
await ctx.complete();
763+
}
764+
});
765+
helper.start();
766+
767+
// Wait for the message to be processed (first via drain, then reclaimed into dispatch)
768+
await waitFor(() => processed.length === 1, 15000);
769+
expect(processed[0]).toBe("legacy-reclaim");
770+
771+
// After completion, both old and new indexes should be clean
772+
const masterAfter = await redis.zcard(masterQueueKey);
773+
const dispatchAfter = await redis.zcard(keys.dispatchKey(0));
774+
const tenantQueuesAfter = await redis.zcard(keys.tenantQueueIndexKey("t1"));
775+
776+
// Old master queue should still be empty (drain removed it)
777+
// or at least the queue itself should be gone
778+
expect(tenantQueuesAfter).toBe(0);
779+
780+
await helper.close();
781+
await redis.quit();
782+
}
783+
);
784+
785+
redisTest(
786+
"should migrate legacy message to dispatch index on retry failure",
787+
{ timeout: 20000 },
788+
async ({ redisOptions }) => {
789+
const keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
790+
const redis = createRedisClient(redisOptions);
791+
const attempts: number[] = [];
792+
793+
// Simulate pre-deploy: write message to old master queue
794+
const queueId = "tenant:t1:queue:legacy";
795+
const queueKey = keys.queueKey(queueId);
796+
const queueItemsKey = keys.queueItemsKey(queueId);
797+
const masterQueueKey = keys.masterQueueKey(0);
798+
799+
const timestamp = Date.now();
800+
const storedMessage: StoredMessage<TestPayload> = {
801+
id: "legacy-retry-1",
802+
queueId,
803+
tenantId: "t1",
804+
payload: { value: "legacy-retry" },
805+
timestamp,
806+
attempt: 1,
807+
};
808+
809+
await redis.zadd(queueKey, timestamp, "legacy-retry-1");
810+
await redis.hset(queueItemsKey, "legacy-retry-1", JSON.stringify(storedMessage));
811+
await redis.zadd(masterQueueKey, timestamp, queueId);
812+
813+
// Create FairQueue with retry enabled
814+
const helper = new TestHelper(redisOptions, keys, {
815+
retry: { maxAttempts: 3, delayMs: 100 },
816+
});
817+
818+
helper.onMessage(async (ctx) => {
819+
attempts.push(ctx.message.attempt);
820+
if (ctx.message.attempt < 2) {
821+
// Fail first attempt — triggers retry which writes to dispatch index
822+
await ctx.fail(new Error("transient"));
823+
} else {
824+
await ctx.complete();
825+
}
826+
});
827+
helper.start();
828+
829+
// Wait for retry to complete
830+
await waitFor(() => attempts.includes(2), 10000);
831+
832+
// The retry release should have written to dispatch indexes.
833+
// After completion, indexes should be clean.
834+
await waitFor(async () => {
835+
const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1"));
836+
return t1Queues === 0;
837+
}, 5000);
838+
839+
expect(attempts).toContain(1);
840+
expect(attempts).toContain(2);
841+
842+
await helper.close();
843+
await redis.quit();
844+
}
845+
);
846+
});
600847
});
601848

602849
// Helper to wait for a condition

0 commit comments

Comments
 (0)