Skip to content

Commit a7c1b4a

Browse files
committed
fix: update visibility manager release/reclaim to use dispatch indexes
The releaseMessage and releaseMessageBatch Lua scripts were still writing to the old master queue shards. Updated them to write to the new dispatch indexes instead, so released/reclaimed messages go into the new two-level index atomically. Also removed the legacyDrainComplete flag in favor of checking ZCARD on each iteration (O(1)), and removed the redundant legacyDrainComplete otel metric since master_queue.length already shows drain status.
1 parent e1604f7 commit a7c1b4a

File tree

2 files changed

+95
-93
lines changed

2 files changed

+95
-93
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 29 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,7 +1094,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
10941094
): Promise<number> {
10951095
const queueKey = this.keys.queueKey(queueId);
10961096
const queueItemsKey = this.keys.queueItemsKey(queueId);
1097-
const masterQueueKey = this.keys.masterQueueKey(shardId);
10981097
const descriptor = this.queueDescriptorCache.get(queueId) ?? {
10991098
id: queueId,
11001099
tenantId,
@@ -1153,12 +1152,16 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11531152
if (!reserved) {
11541153
// Release ALL remaining messages (from index i onward) back to queue
11551154
// This prevents messages from being stranded in the in-flight set
1155+
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId);
1156+
const dispatchKey = this.keys.dispatchKey(shardId);
11561157
await this.visibilityManager.releaseBatch(
11571158
claimedMessages.slice(i),
11581159
queueId,
11591160
queueKey,
11601161
queueItemsKey,
1161-
masterQueueKey
1162+
tenantQueueIndexKey,
1163+
dispatchKey,
1164+
tenantId
11621165
);
11631166
// Stop processing more messages from this queue since we're at capacity
11641167
break;
@@ -1293,7 +1296,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12931296
const shardId = this.masterQueue.getShardForQueue(queueId);
12941297
const queueKey = this.keys.queueKey(queueId);
12951298
const queueItemsKey = this.keys.queueItemsKey(queueId);
1296-
const masterQueueKey = this.keys.masterQueueKey(shardId);
12971299
const inflightDataKey = this.keys.inflightDataKey(shardId);
12981300

12991301
// Get stored message for concurrency release
@@ -1315,13 +1317,17 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13151317
}
13161318
: { id: queueId, tenantId: "", metadata: {} };
13171319

1318-
// Release back to queue (visibility manager updates old master queue internally)
1320+
// Release back to queue (visibility manager updates dispatch indexes atomically)
1321+
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1322+
const dispatchKey = this.keys.dispatchKey(shardId);
13191323
await this.visibilityManager.release(
13201324
messageId,
13211325
queueId,
13221326
queueKey,
13231327
queueItemsKey,
1324-
masterQueueKey,
1328+
tenantQueueIndexKey,
1329+
dispatchKey,
1330+
descriptor.tenantId,
13251331
Date.now() // Put at back of queue
13261332
);
13271333

@@ -1330,17 +1336,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13301336
await this.concurrencyManager.release(descriptor, messageId);
13311337
}
13321338

1333-
// Update new dispatch indexes (message is back in queue, update scores)
1334-
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1335-
const dispatchKey = this.keys.dispatchKey(shardId);
1336-
await this.redis.updateDispatchIndexes(
1337-
queueKey,
1338-
tenantQueueIndexKey,
1339-
dispatchKey,
1340-
queueId,
1341-
descriptor.tenantId
1342-
);
1343-
13441339
this.logger.debug("Message released", {
13451340
messageId,
13461341
queueId,
@@ -1359,7 +1354,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13591354
const shardId = this.masterQueue.getShardForQueue(queueId);
13601355
const queueKey = this.keys.queueKey(queueId);
13611356
const queueItemsKey = this.keys.queueItemsKey(queueId);
1362-
const masterQueueKey = this.keys.masterQueueKey(shardId);
13631357
const inflightDataKey = this.keys.inflightDataKey(shardId);
13641358

13651359
// Get stored message
@@ -1391,7 +1385,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13911385
queueId,
13921386
queueKey,
13931387
queueItemsKey,
1394-
masterQueueKey,
13951388
shardId,
13961389
descriptor,
13971390
error
@@ -1407,7 +1400,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14071400
queueId: string,
14081401
queueKey: string,
14091402
queueItemsKey: string,
1410-
masterQueueKey: string,
14111403
shardId: number,
14121404
descriptor: QueueDescriptor,
14131405
error?: Error
@@ -1427,12 +1419,16 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14271419

14281420
// Release with delay, passing the updated message data so the Lua script
14291421
// atomically writes the incremented attempt count when re-queuing.
1422+
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1423+
const dispatchKey = this.keys.dispatchKey(shardId);
14301424
await this.visibilityManager.release(
14311425
storedMessage.id,
14321426
queueId,
14331427
queueKey,
14341428
queueItemsKey,
1435-
masterQueueKey,
1429+
tenantQueueIndexKey,
1430+
dispatchKey,
1431+
descriptor.tenantId,
14361432
Date.now() + nextDelay,
14371433
JSON.stringify(updatedMessage)
14381434
);
@@ -1442,17 +1438,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14421438
await this.concurrencyManager.release(descriptor, storedMessage.id);
14431439
}
14441440

1445-
// Update dispatch indexes (message is back in queue with delay)
1446-
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1447-
const dispatchKey = this.keys.dispatchKey(shardId);
1448-
await this.redis.updateDispatchIndexes(
1449-
queueKey,
1450-
tenantQueueIndexKey,
1451-
dispatchKey,
1452-
queueId,
1453-
descriptor.tenantId
1454-
);
1455-
14561441
this.telemetry.recordRetry();
14571442

14581443
this.logger.debug("Message scheduled for retry", {
@@ -1550,11 +1535,17 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
15501535
let totalReclaimed = 0;
15511536

15521537
for (let shardId = 0; shardId < this.shardCount; shardId++) {
1553-
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
1554-
queueKey: this.keys.queueKey(queueId),
1555-
queueItemsKey: this.keys.queueItemsKey(queueId),
1556-
masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)),
1557-
}));
1538+
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => {
1539+
const tenantId = this.keys.extractTenantId(queueId);
1540+
const queueShardId = this.masterQueue.getShardForQueue(queueId);
1541+
return {
1542+
queueKey: this.keys.queueKey(queueId),
1543+
queueItemsKey: this.keys.queueItemsKey(queueId),
1544+
tenantQueueIndexKey: this.keys.tenantQueueIndexKey(tenantId),
1545+
dispatchKey: this.keys.dispatchKey(queueShardId),
1546+
tenantId,
1547+
};
1548+
});
15581549

15591550
if (reclaimedMessages.length > 0) {
15601551
// Release concurrency for all reclaimed messages in a single batch
@@ -1580,32 +1571,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
15801571
}
15811572
}
15821573

1583-
// Update dispatch indexes for reclaimed queues (messages are back in queue)
1584-
const updatedQueues = new Set<string>();
1585-
for (const msg of reclaimedMessages) {
1586-
const key = `${msg.tenantId}:${msg.queueId}`;
1587-
if (updatedQueues.has(key)) continue;
1588-
updatedQueues.add(key);
1589-
1590-
try {
1591-
const queueKey = this.keys.queueKey(msg.queueId);
1592-
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(msg.tenantId);
1593-
const dispatchKey = this.keys.dispatchKey(shardId);
1594-
await this.redis.updateDispatchIndexes(
1595-
queueKey,
1596-
tenantQueueIndexKey,
1597-
dispatchKey,
1598-
msg.queueId,
1599-
msg.tenantId
1600-
);
1601-
} catch (error) {
1602-
this.logger.error("Failed to update dispatch indexes for reclaimed message", {
1603-
queueId: msg.queueId,
1604-
tenantId: msg.tenantId,
1605-
error: error instanceof Error ? error.message : String(error),
1606-
});
1607-
}
1608-
}
1574+
// Dispatch indexes are updated atomically by the releaseMessage Lua script
1575+
// inside reclaimTimedOut, so no separate index update needed here.
16091576
}
16101577

16111578
totalReclaimed += reclaimedMessages.length;

0 commit comments

Comments
 (0)