Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .server-changes/run-queue-sweeper-stale-entry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
area: webapp
type: fix
---

Concurrency sweeper now removes the message from the worker queue list
when acking marked runs, eliminating stale `messageKey` tombstones that
produced "Failed to dequeue message from worker queue" errors when
consumed by a later BLPOP.
2 changes: 1 addition & 1 deletion internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2847,7 +2847,7 @@ export class RunQueue {

await this.acknowledgeMessage(run.orgId, run.messageId, {
skipDequeueProcessing: true,
removeFromWorkerQueue: false,
removeFromWorkerQueue: true,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,93 @@ describe("RunQueue Concurrency Sweeper", () => {
}
}
);

// When the sweeper acks a run whose messageKey value is still sitting on the worker
// queue list (e.g. fast-path enqueued, never BLPOP'd), it must remove the entry from
// the list as well as deleting the message body. Otherwise the list keeps a stale
// tombstone — the next BLPOP returns the messageKey, GET returns nil, and the dequeue
// path logs "Failed to dequeue message from worker queue".
redisTest(
"should clear the worker queue list when sweeper acks a fast-path-enqueued run",
async ({ redisContainer }) => {
let enableConcurrencySweeper = false;

const queue = new RunQueue({
...testOptions,
logLevel: "debug",
queueSelectionStrategy: new FairQueueSelectionStrategy({
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
keys: testOptions.keys,
}),
workerOptions: {
pollIntervalMs: 100,
immediatePollIntervalMs: 100,
},
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
concurrencySweeper: {
scanSchedule: "* * * * * *",
scanJitterInMs: 5,
processMarkedSchedule: "* * * * * *",
processMarkedJitterInMs: 5,
callback: async (runIds) => {
if (!enableConcurrencySweeper) {
return [];
}
return [{ id: messageDev.runId, orgId: "o1234" }];
},
},
});

try {
await queue.updateEnvConcurrencyLimits(authenticatedEnvDev);

// Fast-path enqueue: SET messageKey, RPUSH messageKeyValue onto worker queue list,
// SADD runId into currentConcurrency. The message is on the list waiting to be popped.
await queue.enqueueMessage({
env: authenticatedEnvDev,
message: messageDev,
workerQueue: authenticatedEnvDev.id,
enableFastPath: true,
});

// Pre-conditions: list has the entry, run is "in-flight" per operational concurrency,
// body exists. Fast-path bumps the operational currentConcurrency (SADD) but not
// currentDequeued — the displayed concurrency is bumped only when a worker BLPOPs.
expect(await queue.peekAllOnWorkerQueue(authenticatedEnvDev.id)).toHaveLength(1);
expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(1);
expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeDefined();

// Sweeper now considers the run completed (test callback returns it), so
// processMarkedRun acks with removeFromWorkerQueue: true.
enableConcurrencySweeper = true;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
await setTimeout(5_000);

// Sweeper has run: operational concurrency released, message body deleted, AND
// the messageKey value has been LREM'd from the worker queue list. Without the
// LREM, the list would still contain the messageKey, and the next BLPOP would
// pop the tombstone and emit "Failed to dequeue message from worker queue".
expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(0);
expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeUndefined();
expect(await queue.peekAllOnWorkerQueue(authenticatedEnvDev.id)).toEqual([]);

// A subsequent blocking dequeue finds nothing — no real message and no tombstone.
const dequeued = await queue.dequeueMessageFromWorkerQueue(
"test_consumer",
authenticatedEnvDev.id,
{ blockingPop: true, blockingPopTimeoutSeconds: 2 }
);
expect(dequeued).toBeUndefined();
} finally {
await queue.quit();
}
}
);
});
Loading