Skip to content

Commit dee6f1d

Browse files
authored
fix(batch): move batch queue global rate limiter to worker consumer level (#3166)
The global rate limiter was being applied at the FairQueue claim phase, consuming 1 token per queue-claim-attempt rather than per item processed. With many small queues (each batch is its own queue), consumers burned through tokens on empty or single-item queues, causing aggressive throttling well below the intended items/sec limit. Changes: - Move rate limiter from FairQueue claim phase to BatchQueue worker queue consumer loop (before blockingPop), so each token = 1 item processed - Replace the FairQueue rate limiter with a worker queue depth cap to prevent unbounded growth that could cause visibility timeouts - Add BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH env var (optional, disabled by default)
1 parent b192b71 commit dee6f1d

File tree

11 files changed

+310
-19
lines changed

11 files changed

+310
-19
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Move batch queue global rate limiter from FairQueue claim phase to BatchQueue worker queue consumer for accurate per-item rate limiting. Add worker queue depth cap to prevent unbounded growth that could cause visibility timeouts.

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,9 @@ const EnvironmentSchema = z
993993
// Global rate limit: max items processed per second across all consumers
994994
// If not set, no global rate limiting is applied
995995
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
996+
// Max items in the worker queue before claiming pauses (protects visibility timeouts)
997+
// If not set, no depth limit is applied
998+
BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH: z.coerce.number().int().positive().optional(),
996999

9971000
ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
9981001
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ function createRunEngine() {
198198
globalRateLimiter: env.BATCH_QUEUE_GLOBAL_RATE_LIMIT
199199
? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT)
200200
: undefined,
201+
// Worker queue depth cap - prevents unbounded growth protecting visibility timeouts
202+
workerQueueMaxDepth: env.BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH,
201203
retry: {
202204
maxAttempts: 6,
203205
minTimeoutInMs: 1_000,

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
isAbortError,
1919
WorkerQueueManager,
2020
type FairQueueOptions,
21+
type GlobalRateLimiter,
2122
} from "@trigger.dev/redis-worker";
2223
import { BatchCompletionTracker } from "./completionTracker.js";
2324
import type {
@@ -76,6 +77,7 @@ export class BatchQueue {
7677
private abortController: AbortController;
7778
private workerQueueConsumerLoops: Promise<void>[] = [];
7879
private workerQueueBlockingTimeoutSeconds: number;
80+
private globalRateLimiter?: GlobalRateLimiter;
7981
private batchedSpanManager: BatchedSpanManager;
8082

8183
// Metrics
@@ -87,6 +89,7 @@ export class BatchQueue {
8789
private batchProcessingDurationHistogram?: Histogram;
8890
private itemQueueTimeHistogram?: Histogram;
8991
private workerQueueLengthGauge?: ObservableGauge;
92+
private rateLimitDeniedCounter?: Counter;
9093

9194
constructor(private options: BatchQueueOptions) {
9295
this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info");
@@ -95,6 +98,7 @@ export class BatchQueue {
9598
this.maxAttempts = options.retry?.maxAttempts ?? 1;
9699
this.abortController = new AbortController();
97100
this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10;
101+
this.globalRateLimiter = options.globalRateLimiter;
98102

99103
// Initialize metrics if meter is provided
100104
if (options.meter) {
@@ -174,8 +178,9 @@ export class BatchQueue {
174178
},
175179
},
176180
],
177-
// Optional global rate limiter to limit max items/sec across all consumers
178-
globalRateLimiter: options.globalRateLimiter,
181+
// Worker queue depth cap to prevent unbounded growth (protects visibility timeouts)
182+
workerQueueMaxDepth: options.workerQueueMaxDepth,
183+
workerQueueDepthCheckId: BATCH_WORKER_QUEUE_ID,
179184
// Enable retry with DLQ disabled when retry config is provided.
180185
// BatchQueue handles the "final failure" in its own processing loop,
181186
// so we don't need the DLQ - we just need the retry scheduling.
@@ -608,6 +613,11 @@ export class BatchQueue {
608613
unit: "ms",
609614
});
610615

616+
this.rateLimitDeniedCounter = meter.createCounter("batch_queue.rate_limit_denied", {
617+
description: "Number of times the global rate limiter denied processing",
618+
unit: "denials",
619+
});
620+
611621
this.workerQueueLengthGauge = meter.createObservableGauge("batch_queue.worker_queue.length", {
612622
description: "Number of items waiting in the batch worker queue",
613623
unit: "items",
@@ -641,6 +651,42 @@ export class BatchQueue {
641651
}
642652

643653
try {
654+
// Rate limit per-item at the processing level (1 token per message).
655+
// Loop until allowed so multiple consumers don't all rush through after one sleep.
656+
if (this.globalRateLimiter) {
657+
while (this.isRunning) {
658+
const result = await this.globalRateLimiter.limit();
659+
if (result.allowed) {
660+
break;
661+
}
662+
this.rateLimitDeniedCounter?.add(1);
663+
const waitMs = Math.max(10, (result.resetAt ?? Date.now()) - Date.now());
664+
if (waitMs > 0) {
665+
await new Promise<void>((resolve, reject) => {
666+
const onAbort = () => {
667+
clearTimeout(timer);
668+
reject(this.abortController.signal.reason);
669+
};
670+
const timer = setTimeout(() => {
671+
// Must remove listener when timeout fires, otherwise listeners accumulate
672+
// (the { once: true } option only removes on abort, not on timeout)
673+
this.abortController.signal.removeEventListener("abort", onAbort);
674+
resolve();
675+
}, waitMs);
676+
if (this.abortController.signal.aborted) {
677+
clearTimeout(timer);
678+
reject(this.abortController.signal.reason);
679+
return;
680+
}
681+
this.abortController.signal.addEventListener("abort", onAbort, { once: true });
682+
});
683+
}
684+
}
685+
if (!this.isRunning) {
686+
break;
687+
}
688+
}
689+
644690
await this.batchedSpanManager.withBatchedSpan(
645691
loopId,
646692
async (span) => {

internal-packages/run-engine/src/batch-queue/tests/index.test.ts

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { redisTest } from "@internal/testcontainers";
22
import { describe, expect, vi } from "vitest";
33
import { BatchQueue } from "../index.js";
4+
import type { GlobalRateLimiter } from "@trigger.dev/redis-worker";
45
import type { CompleteBatchResult, InitializeBatchOptions, BatchItem } from "../types.js";
56

67
vi.setConfig({ testTimeout: 60_000 });
@@ -658,4 +659,121 @@ describe("BatchQueue", () => {
658659
}
659660
);
660661
});
662+
663+
describe("global rate limiter at worker queue consumer level", () => {
664+
redisTest(
665+
"should call rate limiter before each processing attempt",
666+
async ({ redisContainer }) => {
667+
let limitCallCount = 0;
668+
const rateLimiter: GlobalRateLimiter = {
669+
async limit() {
670+
limitCallCount++;
671+
return { allowed: true };
672+
},
673+
};
674+
675+
const queue = new BatchQueue({
676+
redis: {
677+
host: redisContainer.getHost(),
678+
port: redisContainer.getPort(),
679+
keyPrefix: "test:",
680+
},
681+
drr: { quantum: 5, maxDeficit: 50 },
682+
consumerCount: 1,
683+
consumerIntervalMs: 50,
684+
startConsumers: true,
685+
globalRateLimiter: rateLimiter,
686+
});
687+
688+
let completionResult: CompleteBatchResult | null = null;
689+
690+
try {
691+
queue.onProcessItem(async ({ itemIndex }) => {
692+
return { success: true, runId: `run_${itemIndex}` };
693+
});
694+
695+
queue.onBatchComplete(async (result) => {
696+
completionResult = result;
697+
});
698+
699+
const itemCount = 5;
700+
await queue.initializeBatch(createInitOptions("batch1", "env1", itemCount));
701+
await enqueueItems(queue, "batch1", "env1", createBatchItems(itemCount));
702+
703+
await vi.waitFor(
704+
() => {
705+
expect(completionResult).not.toBeNull();
706+
},
707+
{ timeout: 10000 }
708+
);
709+
710+
expect(completionResult!.successfulRunCount).toBe(itemCount);
711+
// Rate limiter is called before each blockingPop, including iterations
712+
// where no message is available, so count >= items processed
713+
expect(limitCallCount).toBeGreaterThanOrEqual(itemCount);
714+
} finally {
715+
await queue.close();
716+
}
717+
}
718+
);
719+
720+
redisTest(
721+
"should delay processing when rate limited",
722+
async ({ redisContainer }) => {
723+
let limitCallCount = 0;
724+
const rateLimiter: GlobalRateLimiter = {
725+
async limit() {
726+
limitCallCount++;
727+
// Rate limit the first 3 calls, then allow
728+
if (limitCallCount <= 3) {
729+
return { allowed: false, resetAt: Date.now() + 100 };
730+
}
731+
return { allowed: true };
732+
},
733+
};
734+
735+
const queue = new BatchQueue({
736+
redis: {
737+
host: redisContainer.getHost(),
738+
port: redisContainer.getPort(),
739+
keyPrefix: "test:",
740+
},
741+
drr: { quantum: 5, maxDeficit: 50 },
742+
consumerCount: 1,
743+
consumerIntervalMs: 50,
744+
startConsumers: true,
745+
globalRateLimiter: rateLimiter,
746+
});
747+
748+
let completionResult: CompleteBatchResult | null = null;
749+
750+
try {
751+
queue.onProcessItem(async ({ itemIndex }) => {
752+
return { success: true, runId: `run_${itemIndex}` };
753+
});
754+
755+
queue.onBatchComplete(async (result) => {
756+
completionResult = result;
757+
});
758+
759+
await queue.initializeBatch(createInitOptions("batch1", "env1", 3));
760+
await enqueueItems(queue, "batch1", "env1", createBatchItems(3));
761+
762+
// Should still complete despite initial rate limiting
763+
await vi.waitFor(
764+
() => {
765+
expect(completionResult).not.toBeNull();
766+
},
767+
{ timeout: 10000 }
768+
);
769+
770+
expect(completionResult!.successfulRunCount).toBe(3);
771+
// Rate limiter was called more times than items due to initial rejections
772+
expect(limitCallCount).toBeGreaterThan(3);
773+
} finally {
774+
await queue.close();
775+
}
776+
}
777+
);
778+
});
661779
});

internal-packages/run-engine/src/batch-queue/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,15 @@ export type BatchQueueOptions = {
213213
/**
214214
* Optional global rate limiter to limit processing across all consumers.
215215
* When configured, limits the max items/second processed globally.
216+
* Rate limiting happens at the worker queue consumer level (1 token per item).
216217
*/
217218
globalRateLimiter?: GlobalRateLimiter;
219+
/**
220+
* Maximum number of items allowed in the worker queue before claiming pauses.
221+
* Prevents unbounded worker queue growth which could cause visibility timeouts.
222+
* Disabled by default (undefined = no limit).
223+
*/
224+
workerQueueMaxDepth?: number;
218225
/** Logger instance */
219226
logger?: Logger;
220227
logLevel?: LogLevel;

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ export class RunEngine {
398398
consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100,
399399
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
400400
globalRateLimiter: options.batchQueue?.globalRateLimiter,
401+
workerQueueMaxDepth: options.batchQueue?.workerQueueMaxDepth,
401402
startConsumers: startBatchQueueConsumers,
402403
retry: options.batchQueue?.retry,
403404
tracer: options.tracer,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ export type RunEngineOptions = {
109109
defaultConcurrency?: number;
110110
/** Optional global rate limiter to limit processing across all consumers */
111111
globalRateLimiter?: GlobalRateLimiter;
112+
/** Maximum worker queue depth before claiming pauses (protects visibility timeouts) */
113+
workerQueueMaxDepth?: number;
112114
/** Retry configuration for failed batch items */
113115
retry?: {
114116
/** Maximum number of attempts (including the first). Default: 1 (no retries) */

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import type {
2424
FairQueueKeyProducer,
2525
FairQueueOptions,
2626
FairScheduler,
27-
GlobalRateLimiter,
2827
QueueCooloffState,
2928
QueueDescriptor,
3029
SchedulerContext,
@@ -97,8 +96,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
9796
private maxCooloffStatesSize: number;
9897
private queueCooloffStates = new Map<string, QueueCooloffState>();
9998

100-
// Global rate limiter
101-
private globalRateLimiter?: GlobalRateLimiter;
99+
// Worker queue backpressure
100+
private workerQueueMaxDepth: number;
101+
private workerQueueDepthCheckId?: string;
102102

103103
// Consumer tracing
104104
private consumerTraceMaxIterations: number;
@@ -152,8 +152,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
152152
this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000;
153153
this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000;
154154

155-
// Global rate limiter
156-
this.globalRateLimiter = options.globalRateLimiter;
155+
// Worker queue backpressure
156+
this.workerQueueMaxDepth = options.workerQueueMaxDepth ?? 0;
157+
this.workerQueueDepthCheckId = options.workerQueueDepthCheckId;
157158

158159
// Consumer tracing
159160
this.consumerTraceMaxIterations = options.consumerTraceMaxIterations ?? 500;
@@ -1110,16 +1111,17 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11101111
maxClaimCount = Math.min(maxClaimCount, availableCapacity);
11111112
}
11121113

1113-
// Check global rate limit - wait if rate limited
1114-
if (this.globalRateLimiter) {
1115-
const result = await this.globalRateLimiter.limit();
1116-
if (!result.allowed && result.resetAt) {
1117-
const waitMs = Math.max(0, result.resetAt - Date.now());
1118-
if (waitMs > 0) {
1119-
this.logger.debug("Global rate limit reached, waiting", { waitMs, loopId });
1120-
await new Promise((resolve) => setTimeout(resolve, waitMs));
1121-
}
1114+
// Check worker queue depth to prevent unbounded growth.
1115+
// Messages in the worker queue are already in-flight with a visibility timeout.
1116+
// If the queue is too deep, consumers can't keep up, and messages risk timing out.
1117+
if (this.workerQueueMaxDepth > 0 && this.workerQueueDepthCheckId) {
1118+
const depth = await this.workerQueueManager.getLength(this.workerQueueDepthCheckId);
1119+
if (depth >= this.workerQueueMaxDepth) {
1120+
return 0;
11221121
}
1122+
// Cap claim size to remaining capacity so we don't overshoot the depth limit
1123+
const remainingCapacity = this.workerQueueMaxDepth - depth;
1124+
maxClaimCount = Math.min(maxClaimCount, remainingCapacity);
11231125
}
11241126

11251127
// Claim batch of messages with visibility timeout

0 commit comments

Comments
 (0)