Skip to content

Commit 8a50142

Browse files
committed
fix(run-engine): lazy-init runningCounter on release/clear paths too
1 parent 1866ff1 commit 8a50142

2 files changed

Lines changed: 120 additions & 7 deletions

File tree

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -986,7 +986,9 @@ export class RunQueue {
986986
this.keys.queueCurrentDequeuedKeyFromQueue(message.queue),
987987
this.keys.envCurrentDequeuedKeyFromQueue(message.queue),
988988
this.keys.queueRunningCounterKeyFromQueue(message.queue),
989-
messageId
989+
this.keys.ckIndexKeyFromQueue(message.queue),
990+
messageId,
991+
this.options.redis.keyPrefix ?? ""
990992
);
991993
}
992994

@@ -2508,7 +2510,9 @@ export class RunQueue {
25082510
queueCurrentDequeuedKey,
25092511
envCurrentDequeuedKey,
25102512
this.keys.queueRunningCounterKeyFromQueue(queue),
2511-
messageId
2513+
this.keys.ckIndexKeyFromQueue(queue),
2514+
messageId,
2515+
this.options.redis.keyPrefix ?? ""
25122516
);
25132517
}
25142518

@@ -4335,6 +4339,9 @@ local envCurrentDequeuedKey = keyPrefix .. string.match(queue, "(.+):queue:") ..
43354339
43364340
-- SADD first so we know if this dequeue is new (return 1) or a duplicate (return 0).
43374341
-- INCR runningCounter is gated on the new-membership result so re-dequeues don't inflate.
4342+
-- The alternative (lazy-init before SADD) was rejected because we need the SADD return
4343+
-- value to gate the INCR, and the lazy-init seed under SADD-first already reflects the
4344+
-- new membership. We compensate with the total-1 in the seed math below.
43384345
local addedDeq = redis.call('SADD', queueCurrentDequeuedKey, messageData.runId)
43394346
redis.call('SADD', envCurrentDequeuedKey, messageData.runId)
43404347
@@ -4932,15 +4939,33 @@ redis.call('SREM', envCurrentDequeuedKey, messageId)
49324939
// something. Caller should only invoke this variant for CK queues — non-CK
49334940
// queues should keep calling releaseConcurrency.
49344941
this.redis.defineCommand("releaseConcurrencyTracked", {
4935-
numberOfKeys: 5,
4942+
numberOfKeys: 6,
49364943
lua: `
4944+
-- Keys:
49374945
local queueCurrentConcurrencyKey = KEYS[1]
49384946
local envCurrentConcurrencyKey = KEYS[2]
49394947
local queueCurrentDequeuedKey = KEYS[3]
49404948
local envCurrentDequeuedKey = KEYS[4]
49414949
local runningCounterKey = KEYS[5]
4950+
local ckIndexKey = KEYS[6]
49424951
4952+
-- Args:
49434953
local messageId = ARGV[1]
4954+
local keyPrefix = ARGV[2]
4955+
4956+
-- Lazy-init runningCounter if missing (e.g. expired via 24h TTL). Runs BEFORE
4957+
-- the SREM so the seed captures pre-release state; the subsequent DECR accounts
4958+
-- for the message we're about to release. Without this, a release landing after
4959+
-- counter expiry would silently no-op the DECR and the next dequeue would seed
4960+
-- to post-release truth — bounded drift but inconsistent with nack/enqueue.
4961+
if redis.call('EXISTS', runningCounterKey) == 0 then
4962+
local total = 0
4963+
local variants = redis.call('ZRANGE', ckIndexKey, 0, -1)
4964+
for _, v in ipairs(variants) do
4965+
total = total + tonumber(redis.call('SCARD', keyPrefix .. v .. ':currentDequeued') or '0')
4966+
end
4967+
redis.call('SET', runningCounterKey, total, 'EX', 86400)
4968+
end
49444969
49454970
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
49464971
redis.call('SREM', envCurrentConcurrencyKey, messageId)
@@ -5057,15 +5082,29 @@ redis.call('SREM', envCurrentDequeuedKey, messageId)
50575082
// Tracked variant of clearMessageFromConcurrencySets — see releaseConcurrencyTracked
50585083
// for the contract. Only invoke for CK queues.
50595084
this.redis.defineCommand("clearMessageFromConcurrencySetsTracked", {
5060-
numberOfKeys: 5,
5085+
numberOfKeys: 6,
50615086
lua: `
5087+
-- Keys:
50625088
local queueCurrentConcurrencyKey = KEYS[1]
50635089
local envCurrentConcurrencyKey = KEYS[2]
50645090
local queueCurrentDequeuedKey = KEYS[3]
50655091
local envCurrentDequeuedKey = KEYS[4]
50665092
local runningCounterKey = KEYS[5]
5093+
local ckIndexKey = KEYS[6]
50675094
5095+
-- Args:
50685096
local messageId = ARGV[1]
5097+
local keyPrefix = ARGV[2]
5098+
5099+
-- Lazy-init runningCounter if missing — see releaseConcurrencyTracked for rationale.
5100+
if redis.call('EXISTS', runningCounterKey) == 0 then
5101+
local total = 0
5102+
local variants = redis.call('ZRANGE', ckIndexKey, 0, -1)
5103+
for _, v in ipairs(variants) do
5104+
total = total + tonumber(redis.call('SCARD', keyPrefix .. v .. ':currentDequeued') or '0')
5105+
end
5106+
redis.call('SET', runningCounterKey, total, 'EX', 86400)
5107+
end
50695108
50705109
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
50715110
redis.call('SREM', envCurrentConcurrencyKey, messageId)
@@ -5608,7 +5647,9 @@ declare module "@internal/redis" {
56085647
queueCurrentDequeuedKey: string,
56095648
envCurrentDequeuedKey: string,
56105649
runningCounterKey: string,
5650+
ckIndexKey: string,
56115651
messageId: string,
5652+
keyPrefix: string,
56125653
callback?: Callback<void>
56135654
): Result<void, Context>;
56145655

@@ -5618,7 +5659,9 @@ declare module "@internal/redis" {
56185659
queueCurrentDequeuedKey: string,
56195660
envCurrentDequeuedKey: string,
56205661
runningCounterKey: string,
5662+
ckIndexKey: string,
56215663
messageId: string,
5664+
keyPrefix: string,
56225665
callback?: Callback<void>
56235666
): Result<void, Context>;
56245667
}

internal-packages/run-engine/src/run-queue/tests/ckCounters.test.ts

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,9 @@ describe("CK base-queue counters", () => {
317317
testOptions.keys.queueCurrentDequeuedKeyFromQueue(variantA),
318318
testOptions.keys.envCurrentDequeuedKey(authenticatedEnvDev),
319319
runningCounterKey,
320-
"phantom-message"
320+
testOptions.keys.ckIndexKeyFromQueue(variantA),
321+
"phantom-message",
322+
"runqueue:test:"
321323
);
322324

323325
expect(Number(await queue.redis.get(runningCounterKey))).toBe(0);
@@ -344,8 +346,8 @@ describe("CK base-queue counters", () => {
344346
"task/my-task"
345347
);
346348
const ttl = await queue.redis.ttl(counterKey);
347-
// Expect roughly 86400; allow slack for test scheduling.
348-
expect(ttl).toBeGreaterThan(86000);
349+
// Expect roughly 86400; allow only a few seconds of slack for test scheduling.
350+
expect(ttl).toBeGreaterThanOrEqual(86390);
349351
expect(ttl).toBeLessThanOrEqual(86400);
350352
} finally {
351353
await queue.quit();
@@ -383,6 +385,74 @@ describe("CK base-queue counters", () => {
383385
}
384386
);
385387

388+
redisTest(
389+
"duplicate nack (runId already in variant zset) does not inflate lengthCounter",
390+
async ({ redisContainer }) => {
391+
const queue = createQueue(redisContainer);
392+
try {
393+
const msg = makeMessage({ runId: "r1", concurrencyKey: "ck-a" });
394+
395+
await queue.enqueueMessage({
396+
env: authenticatedEnvDev,
397+
message: msg,
398+
workerQueue: authenticatedEnvDev.id,
399+
skipDequeueProcessing: true,
400+
});
401+
expect(await queue.lengthOfQueue(authenticatedEnvDev, msg.queue)).toBe(1);
402+
403+
// Nack without dequeuing first — the message is still in the variant zset.
404+
// ZADD returns 0 (already present), so lengthCounter must not bump.
405+
await queue.nackMessage({
406+
orgId: msg.orgId,
407+
messageId: msg.runId,
408+
skipDequeueProcessing: true,
409+
incrementAttemptCount: false,
410+
});
411+
expect(await queue.lengthOfQueue(authenticatedEnvDev, msg.queue)).toBe(1);
412+
413+
// A second nack on the same runId must still be a no-op for the counter.
414+
await queue.nackMessage({
415+
orgId: msg.orgId,
416+
messageId: msg.runId,
417+
skipDequeueProcessing: true,
418+
incrementAttemptCount: false,
419+
});
420+
expect(await queue.lengthOfQueue(authenticatedEnvDev, msg.queue)).toBe(1);
421+
} finally {
422+
await queue.quit();
423+
}
424+
}
425+
);
426+
427+
redisTest(
428+
"duplicate dequeueMessageFromKey (same runId) does not inflate runningCounter",
429+
async ({ redisContainer }) => {
430+
const queue = createQueue(redisContainer);
431+
try {
432+
const msg = makeMessage({ runId: "r1", concurrencyKey: "ck-a" });
433+
const queueKey = testOptions.keys.queueKey(authenticatedEnvDev, msg.queue, "ck-a");
434+
const messageKey = testOptions.keys.messageKey(msg.orgId, msg.runId);
435+
const runningCounterKey =
436+
testOptions.keys.queueRunningCounterKeyFromQueue(queueKey);
437+
438+
await queue.redis.set(
439+
messageKey,
440+
JSON.stringify({ ...msg, queue: queueKey, version: "2", workerQueue: "wq" })
441+
);
442+
443+
// First call: SADD returns 1, runningCounter goes 0 -> 1.
444+
await queue.redis.dequeueMessageFromKeyTracked(messageKey, "runqueue:test:");
445+
expect(Number(await queue.redis.get(runningCounterKey))).toBe(1);
446+
447+
// Second call on the same messageKey: SADD returns 0, runningCounter must stay at 1.
448+
await queue.redis.dequeueMessageFromKeyTracked(messageKey, "runqueue:test:");
449+
expect(Number(await queue.redis.get(runningCounterKey))).toBe(1);
450+
} finally {
451+
await queue.quit();
452+
}
453+
}
454+
);
455+
386456
redisTest(
387457
"nack lazy-inits lengthCounter when it expired",
388458
async ({ redisContainer }) => {

0 commit comments

Comments
 (0)