fix(run-queue): prevent concurrency keys from bloating master queue shards#3219
fix(run-queue): prevent concurrency keys from bloating master queue shards#3219
Conversation
|
WalkthroughAdds concurrency-key (CK) support across the RunQueue implementation. New key utilities (ckIndexKeyFromQueue, baseQueueKeyFromQueue, isCkWildcard, toCkWildcard) were added to key producer/types. Enqueue/dequeue flows gained CK-aware variants so CK queues are deduplicated to a CK-wildcard master entry while CK-specific indexes track individual keys. Redis Lua scripts and RedisCommander were extended with CK-aware enqueue/dequeue/ack/nack/move-to-DLQ commands (including TTL variants). Master-queue orchestration and rebalance logic were updated to skip legacy rebalance for CK queues. Tests for CK index behavior and key production were added. Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…work to reset the cooloff state when a queue is successfully dequeued from
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
internal-packages/run-engine/src/run-queue/index.ts (2)
1554-1568: Add@crumbstracing markers in new CK control-flow blocks.The newly added CK paths are good candidates for crumbs-based tracing, but no
@crumbsmarkers were added in these new branches.As per coding guidelines "Add crumbs as you write code using //
@crumbscomments or //#region@crumbsblocks for agentcrumbs debug tracing".Also applies to: 1656-1667, 1756-1799, 1960-2069
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal-packages/run-engine/src/run-queue/index.ts` around lines 1554 - 1568, The new CK-specific branches around the queue dispatch lack crumbs tracing markers; add // `@crumbs` comments at the start of the CK control-flow blocks where this.keys.isCkWildcard(queue) chooses the CK path and inside the CK-specific handlers (`#callDequeueMessagesFromCkQueue`, `#callDequeueMessagesFromQueue`) so that agentcrumbs can trace these branches (mirror the existing crumbs style used elsewhere in this file, and repeat similar // `@crumbs` markers for the other CK blocks noted at the other ranges).
1394-1398: Use key-producer CK parsing instead of substring matching.
q.includes(":ck:")is fragile for CK detection. Use the queue-key helper so rebalance logic stays correct even if queue names contain similar substrings.♻️ Suggested refactor
- const nonCkQueues = queueNames.filter((q) => !q.includes(":ck:")); + const nonCkQueues = queueNames.filter( + (q) => this.keys.baseQueueKeyFromQueue(q) === q + );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal-packages/run-engine/src/run-queue/index.ts` around lines 1394 - 1398, Replace the fragile substring check q.includes(":ck:") with the queue-key helper's CK detection (e.g., use the helper function that parses/identifies checkpoint queues) when building nonCkQueues from queueNames; filter using that helper (so checkpoint queues are detected correctly even if names contain similar substrings), then dedupe into uniqueQueueNames and call pipeline.migrateLegacyMasterQueues(masterQueueKey, keyPrefix, ...uniqueQueueNames) as before. Ensure you import/reference the queue-key helper (the parse/isCheckpoint function) and use its boolean result in the filter instead of the string include test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal-packages/run-engine/src/run-queue/index.ts`:
- Around line 2037-2040: The debug call in dequeueMessagesFromCkQueue currently
logs the entire raw CK payload (this.logger.debug("dequeueMessagesFromCkQueue
raw result", { result, service: this.name })), which may expose full message
contents; change it to avoid logging raw payloads by replacing the logged
"result" with a sanitized summary (e.g. message count, message IDs or hashed
IDs, total size, and/or truncated bodies) or remove the payload entirely and
only log metadata and this.name for tracing; ensure the change is made in the
dequeueMessagesFromCkQueue logging statement where this.logger.debug is called.
---
Nitpick comments:
In `@internal-packages/run-engine/src/run-queue/index.ts`:
- Around line 1554-1568: The new CK-specific branches around the queue dispatch
lack crumbs tracing markers; add // `@crumbs` comments at the start of the CK
control-flow blocks where this.keys.isCkWildcard(queue) chooses the CK path and
inside the CK-specific handlers (`#callDequeueMessagesFromCkQueue`,
`#callDequeueMessagesFromQueue`) so that agentcrumbs can trace these branches
(mirror the existing crumbs style used elsewhere in this file, and repeat
similar // `@crumbs` markers for the other CK blocks noted at the other ranges).
- Around line 1394-1398: Replace the fragile substring check q.includes(":ck:")
with the queue-key helper's CK detection (e.g., use the helper function that
parses/identifies checkpoint queues) when building nonCkQueues from queueNames;
filter using that helper (so checkpoint queues are detected correctly even if
names contain similar substrings), then dedupe into uniqueQueueNames and call
pipeline.migrateLegacyMasterQueues(masterQueueKey, keyPrefix,
...uniqueQueueNames) as before. Ensure you import/reference the queue-key helper
(the parse/isCheckpoint function) and use its boolean result in the filter
instead of the string include test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: b9113f35-56db-43fc-bd12-9840515da7b7
📒 Files selected for processing (1)
internal-packages/run-engine/src/run-queue/index.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
- GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
- GitHub Check: sdk-compat / Bun Runtime
- GitHub Check: sdk-compat / Cloudflare Workers
- GitHub Check: sdk-compat / Deno Runtime
- GitHub Check: typecheck / typecheck
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
**/*.{ts,tsx}: Use task export syntax: export const myTask = task({ id: 'my-task', run: async (payload) => { ... } })
Use Run Engine 2.0 (@internal/run-engine) and redis-worker for all new work - avoid DEPRECATED zodworker (Graphile-worker wrapper)
Prisma 6.14.0 client and schema use PostgreSQL in internal-packages/database - import only from Prisma client
Files:
internal-packages/run-engine/src/run-queue/index.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
internal-packages/run-engine/src/run-queue/index.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
internal-packages/run-engine/src/run-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier before committing
Files:
internal-packages/run-engine/src/run-queue/index.ts
**/*.{ts,tsx,js}
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.{ts,tsx,js}: Always import from@trigger.dev/sdkfor Trigger.dev tasks - never use@trigger.dev/sdk/v3or deprecated client.defineJob
Import subpaths only from@trigger.dev/core, never import from root
Add crumbs as you write code using //@crumbscomments or //#region@crumbsblocks for agentcrumbs debug tracing
Files:
internal-packages/run-engine/src/run-queue/index.ts
🧠 Learnings (11)
📓 Common learnings
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3219
File: internal-packages/run-engine/src/run-queue/index.ts:3264-3353
Timestamp: 2026-03-14T10:06:22.588Z
Learning: In `internal-packages/run-engine/src/run-queue/index.ts`, the `dequeueMessagesFromCkQueue` Lua command intentionally dequeues only 1 message per CK sub-queue per iteration (round-robin). This is by design for fairness across concurrency keys (CKs). The `actualMaxCount * 3` scan limit for `ZRANGEBYSCORE` on the CK index is also intentional — it is adequate because each CK independently gets the full queue concurrency limit, so per-CK concurrency blocks rarely occur.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3219
File: internal-packages/run-engine/src/run-queue/index.ts:774-777
Timestamp: 2026-03-14T10:24:34.005Z
Learning: In `internal-packages/run-engine/src/run-queue/index.ts`, checking `message.concurrencyKey` to branch CK vs non-CK logic is safe and intentional. The `concurrencyKey` field and the `:ck:` suffix in `message.queue` are always set atomically during the same enqueue call (`queueKey = this.keys.queueKey(env, message.queue, concurrencyKey)`). In `enqueueMessage`, `concurrencyKey` is the source of truth that creates the `:ck:` queue name (checking the queue name would be circular). In `#callAcknowledgeMessage`, `#callNackMessage`, and `#callMoveToDeadLetterQueue`, the `OutputPayload` is a single serialized JSON blob containing both fields — a message with `:ck:` in the queue name but a missing `concurrencyKey` cannot exist. Do not suggest replacing `message.concurrencyKey` checks with queue-name-derived predicates.
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: internal-packages/run-engine/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:25.254Z
Learning: Use Redis for distributed locks and queues in the RunEngine system via `RunQueue` and `RunLocker` utilities
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.
📚 Learning: 2026-03-14T10:06:22.588Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3219
File: internal-packages/run-engine/src/run-queue/index.ts:3264-3353
Timestamp: 2026-03-14T10:06:22.588Z
Learning: In `internal-packages/run-engine/src/run-queue/index.ts`, the `dequeueMessagesFromCkQueue` Lua command intentionally dequeues only 1 message per CK sub-queue per iteration (round-robin). This is by design for fairness across concurrency keys (CKs). The `actualMaxCount * 3` scan limit for `ZRANGEBYSCORE` on the CK index is also intentional — it is adequate because each CK independently gets the full queue concurrency limit, so per-CK concurrency blocks rarely occur.
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-14T10:24:34.005Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3219
File: internal-packages/run-engine/src/run-queue/index.ts:774-777
Timestamp: 2026-03-14T10:24:34.005Z
Learning: In `internal-packages/run-engine/src/run-queue/index.ts`, checking `message.concurrencyKey` to branch CK vs non-CK logic is safe and intentional. The `concurrencyKey` field and the `:ck:` suffix in `message.queue` are always set atomically during the same enqueue call (`queueKey = this.keys.queueKey(env, message.queue, concurrencyKey)`). In `enqueueMessage`, `concurrencyKey` is the source of truth that creates the `:ck:` queue name (checking the queue name would be circular). In `#callAcknowledgeMessage`, `#callNackMessage`, and `#callMoveToDeadLetterQueue`, the `OutputPayload` is a single serialized JSON blob containing both fields — a message with `:ck:` in the queue name but a missing `concurrencyKey` cannot exist. Do not suggest replacing `message.concurrencyKey` checks with queue-name-derived predicates.
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.173Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.173Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/queue.ts : Job queue abstraction should be Redis-backed in src/queue.ts
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-03T13:08:03.862Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:08:03.862Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-02T12:43:25.254Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: internal-packages/run-engine/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:25.254Z
Learning: Use Redis for distributed locks and queues in the RunEngine system via `RunQueue` and `RunLocker` utilities
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-13T13:37:49.544Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T13:37:49.544Z
Learning: Applies to **/*.{ts,tsx} : Use Run Engine 2.0 (internal/run-engine) and redis-worker for all new work - avoid DEPRECATED zodworker (Graphile-worker wrapper)
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.173Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.173Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/worker.ts : Worker loop and job processing should implement concurrency control in src/worker.ts
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-03T13:07:33.177Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:33.177Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option
Applied to files:
internal-packages/run-engine/src/run-queue/index.ts
🔇 Additional comments (3)
internal-packages/run-engine/src/run-queue/index.ts (3)
639-666: CK wildcard dedupe path is correctly applied across enqueue/ack/nack.Using a single wildcard dedupe key for CK queues here is consistent and prevents per-key scheduling fan-out.
Also applies to: 774-785, 859-870
1554-1568: CK dequeue dispatch + cooloff reset fix look good.Routing wildcard queues through the CK-aware dequeue function and clearing cooloff on successful dequeue addresses the stuck-cooloff behavior cleanly.
Also applies to: 1581-1584
2890-2999: CK Lua command surface is coherent and consistently wired.The enqueue/dequeue/ack/nack/DLQ CK command set and the
RedisCommandersignatures line up well and preserve CK-index/master-wildcard maintenance.Also applies to: 3233-3376, 3561-3720, 4048-4171
Queues with concurrency keys now appear as a single entry in the master queue instead of one entry per key. This prevents high-CK-count tenants from consuming the entire
parentQueueLimitwindow and starving other tenants on the same shard.A new per-queue CK index (sorted set) tracks active concurrency key sub-queues. The master queue gets one
:ck:*wildcard entry per base queue. Dequeuing from that entry round-robins across sub-queues, maintaining per-CK concurrency tracking and fairness.All existing operations (enqueue, dequeue, ack, nack, DLQ, TTL expiry) are CK-index-aware and keep the index consistent. Old-format entries drain naturally during rollout — no migration step needed, single deploy.