Skip to content

fix(run-queue): prevent concurrency keys from bloating master queue shards#3219

Merged
ericallam merged 4 commits intomainfrom
ea-branch-121
Mar 14, 2026
Merged

fix(run-queue): prevent concurrency keys from bloating master queue shards#3219
ericallam merged 4 commits intomainfrom
ea-branch-121

Conversation

@ericallam
Copy link
Member

@ericallam ericallam commented Mar 14, 2026

Queues with concurrency keys now appear as a single entry in the master queue instead of one entry per key. This prevents high-CK-count tenants from consuming the entire parentQueueLimit window and starving other tenants on the same shard.

A new per-queue CK index (sorted set) tracks active concurrency key sub-queues. The master queue gets one :ck:* wildcard entry per base queue. Dequeuing from that entry round-robins across sub-queues, maintaining per-CK concurrency tracking and fairness.

All existing operations (enqueue, dequeue, ack, nack, DLQ, TTL expiry) are CK-index-aware and keep the index consistent. Old-format entries drain naturally during rollout — no migration step needed, single deploy.

@changeset-bot
Copy link

changeset-bot bot commented Mar 14, 2026

⚠️ No Changeset found

Latest commit: 6f7bcc5

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 14, 2026

Walkthrough

Adds concurrency-key (CK) support across the RunQueue implementation. New key utilities (ckIndexKeyFromQueue, baseQueueKeyFromQueue, isCkWildcard, toCkWildcard) were added to key producer/types. Enqueue/dequeue flows gained CK-aware variants so CK queues are deduplicated to a CK-wildcard master entry while CK-specific indexes track individual keys. Redis Lua scripts and RedisCommander were extended with CK-aware enqueue/dequeue/ack/nack/move-to-DLQ commands (including TTL variants). Master-queue orchestration and rebalance logic were updated to skip legacy rebalance for CK queues. Tests for CK index behavior and key production were added.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description explains the problem, the solution (CK index), and key implementation details, but does not follow the required template structure with checklist, testing, changelog, or screenshots sections. Add the required template sections: checklist items, testing steps, changelog entry, and issue reference (Closes #) at the top.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: preventing concurrency keys from bloating the master queue, which is the core objective of this PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ea-branch-121
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 6 additional findings.

Open in Devin Review

@ericallam ericallam marked this pull request as ready for review March 14, 2026 10:23
…work to reset the cooloff state when a queue is successfully dequeued from
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
internal-packages/run-engine/src/run-queue/index.ts (2)

1554-1568: Add @crumbs tracing markers in new CK control-flow blocks.

The newly added CK paths are good candidates for crumbs-based tracing, but no @crumbs markers were added in these new branches.

As per coding guidelines "Add crumbs as you write code using // @crumbs comments or // #region @crumbs blocks for agentcrumbs debug tracing".

Also applies to: 1656-1667, 1756-1799, 1960-2069

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal-packages/run-engine/src/run-queue/index.ts` around lines 1554 -
1568, The new CK-specific branches around the queue dispatch lack crumbs tracing
markers; add // `@crumbs` comments at the start of the CK control-flow blocks
where this.keys.isCkWildcard(queue) chooses the CK path and inside the
CK-specific handlers (`#callDequeueMessagesFromCkQueue`,
`#callDequeueMessagesFromQueue`) so that agentcrumbs can trace these branches
(mirror the existing crumbs style used elsewhere in this file, and repeat
similar // `@crumbs` markers for the other CK blocks noted at the other ranges).

1394-1398: Use key-producer CK parsing instead of substring matching.

q.includes(":ck:") is fragile for CK detection. Use the queue-key helper so rebalance logic stays correct even if queue names contain similar substrings.

♻️ Suggested refactor
-        const nonCkQueues = queueNames.filter((q) => !q.includes(":ck:"));
+        const nonCkQueues = queueNames.filter(
+          (q) => this.keys.baseQueueKeyFromQueue(q) === q
+        );
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal-packages/run-engine/src/run-queue/index.ts` around lines 1394 -
1398, Replace the fragile substring check q.includes(":ck:") with the queue-key
helper's CK detection (e.g., use the helper function that parses/identifies
checkpoint queues) when building nonCkQueues from queueNames; filter using that
helper (so checkpoint queues are detected correctly even if names contain
similar substrings), then dedupe into uniqueQueueNames and call
pipeline.migrateLegacyMasterQueues(masterQueueKey, keyPrefix,
...uniqueQueueNames) as before. Ensure you import/reference the queue-key helper
(the parse/isCheckpoint function) and use its boolean result in the filter
instead of the string include test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal-packages/run-engine/src/run-queue/index.ts`:
- Around line 2037-2040: The debug call in dequeueMessagesFromCkQueue currently
logs the entire raw CK payload (this.logger.debug("dequeueMessagesFromCkQueue
raw result", { result, service: this.name })), which may expose full message
contents; change it to avoid logging raw payloads by replacing the logged
"result" with a sanitized summary (e.g. message count, message IDs or hashed
IDs, total size, and/or truncated bodies) or remove the payload entirely and
only log metadata and this.name for tracing; ensure the change is made in the
dequeueMessagesFromCkQueue logging statement where this.logger.debug is called.

---

Nitpick comments:
In `@internal-packages/run-engine/src/run-queue/index.ts`:
- Around line 1554-1568: The new CK-specific branches around the queue dispatch
lack crumbs tracing markers; add // `@crumbs` comments at the start of the CK
control-flow blocks where this.keys.isCkWildcard(queue) chooses the CK path and
inside the CK-specific handlers (`#callDequeueMessagesFromCkQueue`,
`#callDequeueMessagesFromQueue`) so that agentcrumbs can trace these branches
(mirror the existing crumbs style used elsewhere in this file, and repeat
similar // `@crumbs` markers for the other CK blocks noted at the other ranges).
- Around line 1394-1398: Replace the fragile substring check q.includes(":ck:")
with the queue-key helper's CK detection (e.g., use the helper function that
parses/identifies checkpoint queues) when building nonCkQueues from queueNames;
filter using that helper (so checkpoint queues are detected correctly even if
names contain similar substrings), then dedupe into uniqueQueueNames and call
pipeline.migrateLegacyMasterQueues(masterQueueKey, keyPrefix,
...uniqueQueueNames) as before. Ensure you import/reference the queue-key helper
(the parse/isCheckpoint function) and use its boolean result in the filter
instead of the string include test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b9113f35-56db-43fc-bd12-9840515da7b7

📥 Commits

Reviewing files that changed from the base of the PR and between b20477d and 6f7bcc5.

📒 Files selected for processing (1)
  • internal-packages/run-engine/src/run-queue/index.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
  • GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
  • GitHub Check: sdk-compat / Bun Runtime
  • GitHub Check: sdk-compat / Cloudflare Workers
  • GitHub Check: sdk-compat / Deno Runtime
  • GitHub Check: typecheck / typecheck
🧰 Additional context used
📓 Path-based instructions (5)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

**/*.{ts,tsx}: Use task export syntax: export const myTask = task({ id: 'my-task', run: async (payload) => { ... } })
Use Run Engine 2.0 (@internal/run-engine) and redis-worker for all new work - avoid DEPRECATED zodworker (Graphile-worker wrapper)
Prisma 6.14.0 client and schema use PostgreSQL in internal-packages/database - import only from Prisma client

Files:

  • internal-packages/run-engine/src/run-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • internal-packages/run-engine/src/run-queue/index.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • internal-packages/run-engine/src/run-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier before committing

Files:

  • internal-packages/run-engine/src/run-queue/index.ts
**/*.{ts,tsx,js}

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.{ts,tsx,js}: Always import from @trigger.dev/sdk for Trigger.dev tasks - never use @trigger.dev/sdk/v3 or deprecated client.defineJob
Import subpaths only from @trigger.dev/core, never import from root
Add crumbs as you write code using // @crumbs comments or // #region @crumbs blocks for agentcrumbs debug tracing

Files:

  • internal-packages/run-engine/src/run-queue/index.ts
🧠 Learnings (11)
📓 Common learnings
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3219
File: internal-packages/run-engine/src/run-queue/index.ts:3264-3353
Timestamp: 2026-03-14T10:06:22.588Z
Learning: In `internal-packages/run-engine/src/run-queue/index.ts`, the `dequeueMessagesFromCkQueue` Lua command intentionally dequeues only 1 message per CK sub-queue per iteration (round-robin). This is by design for fairness across concurrency keys (CKs). The `actualMaxCount * 3` scan limit for `ZRANGEBYSCORE` on the CK index is also intentional — it is adequate because each CK independently gets the full queue concurrency limit, so per-CK concurrency blocks rarely occur.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3219
File: internal-packages/run-engine/src/run-queue/index.ts:774-777
Timestamp: 2026-03-14T10:24:34.005Z
Learning: In `internal-packages/run-engine/src/run-queue/index.ts`, checking `message.concurrencyKey` to branch CK vs non-CK logic is safe and intentional. The `concurrencyKey` field and the `:ck:` suffix in `message.queue` are always set atomically during the same enqueue call (`queueKey = this.keys.queueKey(env, message.queue, concurrencyKey)`). In `enqueueMessage`, `concurrencyKey` is the source of truth that creates the `:ck:` queue name (checking the queue name would be circular). In `#callAcknowledgeMessage`, `#callNackMessage`, and `#callMoveToDeadLetterQueue`, the `OutputPayload` is a single serialized JSON blob containing both fields — a message with `:ck:` in the queue name but a missing `concurrencyKey` cannot exist. Do not suggest replacing `message.concurrencyKey` checks with queue-name-derived predicates.
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: internal-packages/run-engine/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:25.254Z
Learning: Use Redis for distributed locks and queues in the RunEngine system via `RunQueue` and `RunLocker` utilities
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.
📚 Learning: 2026-03-14T10:06:22.588Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3219
File: internal-packages/run-engine/src/run-queue/index.ts:3264-3353
Timestamp: 2026-03-14T10:06:22.588Z
Learning: In `internal-packages/run-engine/src/run-queue/index.ts`, the `dequeueMessagesFromCkQueue` Lua command intentionally dequeues only 1 message per CK sub-queue per iteration (round-robin). This is by design for fairness across concurrency keys (CKs). The `actualMaxCount * 3` scan limit for `ZRANGEBYSCORE` on the CK index is also intentional — it is adequate because each CK independently gets the full queue concurrency limit, so per-CK concurrency blocks rarely occur.

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-14T10:24:34.005Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3219
File: internal-packages/run-engine/src/run-queue/index.ts:774-777
Timestamp: 2026-03-14T10:24:34.005Z
Learning: In `internal-packages/run-engine/src/run-queue/index.ts`, checking `message.concurrencyKey` to branch CK vs non-CK logic is safe and intentional. The `concurrencyKey` field and the `:ck:` suffix in `message.queue` are always set atomically during the same enqueue call (`queueKey = this.keys.queueKey(env, message.queue, concurrencyKey)`). In `enqueueMessage`, `concurrencyKey` is the source of truth that creates the `:ck:` queue name (checking the queue name would be circular). In `#callAcknowledgeMessage`, `#callNackMessage`, and `#callMoveToDeadLetterQueue`, the `OutputPayload` is a single serialized JSON blob containing both fields — a message with `:ck:` in the queue name but a missing `concurrencyKey` cannot exist. Do not suggest replacing `message.concurrencyKey` checks with queue-name-derived predicates.

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.173Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.173Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/queue.ts : Job queue abstraction should be Redis-backed in src/queue.ts

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-03T13:08:03.862Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:08:03.862Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-02T12:43:25.254Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: internal-packages/run-engine/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:25.254Z
Learning: Use Redis for distributed locks and queues in the RunEngine system via `RunQueue` and `RunLocker` utilities

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-13T13:37:49.544Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T13:37:49.544Z
Learning: Applies to **/*.{ts,tsx} : Use Run Engine 2.0 (internal/run-engine) and redis-worker for all new work - avoid DEPRECATED zodworker (Graphile-worker wrapper)

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-02T12:43:43.173Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.173Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/worker.ts : Worker loop and job processing should implement concurrency control in src/worker.ts

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-03-03T13:07:33.177Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:33.177Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option

Applied to files:

  • internal-packages/run-engine/src/run-queue/index.ts
🔇 Additional comments (3)
internal-packages/run-engine/src/run-queue/index.ts (3)

639-666: CK wildcard dedupe path is correctly applied across enqueue/ack/nack.

Using a single wildcard dedupe key for CK queues here is consistent and prevents per-key scheduling fan-out.

Also applies to: 774-785, 859-870


1554-1568: CK dequeue dispatch + cooloff reset fix look good.

Routing wildcard queues through the CK-aware dequeue function and clearing cooloff on successful dequeue addresses the stuck-cooloff behavior cleanly.

Also applies to: 1581-1584


2890-2999: CK Lua command surface is coherent and consistently wired.

The enqueue/dequeue/ack/nack/DLQ CK command set and the RedisCommander signatures line up well and preserve CK-index/master-wildcard maintenance.

Also applies to: 3233-3376, 3561-3720, 4048-4171

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View 9 additional findings in Devin Review.

Open in Devin Review

@ericallam ericallam merged commit 7672e8d into main Mar 14, 2026
51 checks passed
@ericallam ericallam deleted the ea-branch-121 branch March 14, 2026 13:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants