Skip to content

Commit 8d2b9ca

Browse files
committed
fix(batch): move batch queue global rate limiter to worker consumer level
The global rate limiter was being applied at the FairQueue claim phase, consuming 1 token per queue-claim-attempt rather than per item processed. With many small queues (each batch is its own queue), consumers burned through tokens on empty or single-item queues, causing aggressive throttling well below the intended items/sec limit. Changes: - Move rate limiter from FairQueue claim phase to BatchQueue worker queue consumer loop (before blockingPop), so each token = 1 item processed - Replace the FairQueue rate limiter with a worker queue depth cap to prevent unbounded growth that could cause visibility timeouts - Add BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH env var (optional, disabled by default)
1 parent b192b71 commit 8d2b9ca

File tree

11 files changed

+258
-19
lines changed

11 files changed

+258
-19
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Move batch queue global rate limiter from FairQueue claim phase to BatchQueue worker queue consumer for accurate per-item rate limiting. Add worker queue depth cap to prevent unbounded growth that could cause visibility timeouts.

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,9 @@ const EnvironmentSchema = z
993993
// Global rate limit: max items processed per second across all consumers
994994
// If not set, no global rate limiting is applied
995995
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
996+
// Max items in the worker queue before claiming pauses (protects visibility timeouts)
997+
// If not set, no depth limit is applied
998+
BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH: z.coerce.number().int().positive().optional(),
996999

9971000
ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
9981001
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ function createRunEngine() {
198198
globalRateLimiter: env.BATCH_QUEUE_GLOBAL_RATE_LIMIT
199199
? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT)
200200
: undefined,
201+
// Worker queue depth cap - prevents unbounded growth protecting visibility timeouts
202+
workerQueueMaxDepth: env.BATCH_QUEUE_WORKER_QUEUE_MAX_DEPTH,
201203
retry: {
202204
maxAttempts: 6,
203205
minTimeoutInMs: 1_000,

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
isAbortError,
1919
WorkerQueueManager,
2020
type FairQueueOptions,
21+
type GlobalRateLimiter,
2122
} from "@trigger.dev/redis-worker";
2223
import { BatchCompletionTracker } from "./completionTracker.js";
2324
import type {
@@ -76,6 +77,7 @@ export class BatchQueue {
7677
private abortController: AbortController;
7778
private workerQueueConsumerLoops: Promise<void>[] = [];
7879
private workerQueueBlockingTimeoutSeconds: number;
80+
private globalRateLimiter?: GlobalRateLimiter;
7981
private batchedSpanManager: BatchedSpanManager;
8082

8183
// Metrics
@@ -95,6 +97,7 @@ export class BatchQueue {
9597
this.maxAttempts = options.retry?.maxAttempts ?? 1;
9698
this.abortController = new AbortController();
9799
this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10;
100+
this.globalRateLimiter = options.globalRateLimiter;
98101

99102
// Initialize metrics if meter is provided
100103
if (options.meter) {
@@ -174,8 +177,9 @@ export class BatchQueue {
174177
},
175178
},
176179
],
177-
// Optional global rate limiter to limit max items/sec across all consumers
178-
globalRateLimiter: options.globalRateLimiter,
180+
// Worker queue depth cap to prevent unbounded growth (protects visibility timeouts)
181+
workerQueueMaxDepth: options.workerQueueMaxDepth,
182+
workerQueueDepthCheckId: BATCH_WORKER_QUEUE_ID,
179183
// Enable retry with DLQ disabled when retry config is provided.
180184
// BatchQueue handles the "final failure" in its own processing loop,
181185
// so we don't need the DLQ - we just need the retry scheduling.
@@ -641,6 +645,17 @@ export class BatchQueue {
641645
}
642646

643647
try {
648+
// Rate limit per-item at the processing level (1 token per message)
649+
if (this.globalRateLimiter) {
650+
const result = await this.globalRateLimiter.limit();
651+
if (!result.allowed && result.resetAt) {
652+
const waitMs = Math.max(0, result.resetAt - Date.now());
653+
if (waitMs > 0) {
654+
await new Promise((resolve) => setTimeout(resolve, waitMs));
655+
}
656+
}
657+
}
658+
644659
await this.batchedSpanManager.withBatchedSpan(
645660
loopId,
646661
async (span) => {

internal-packages/run-engine/src/batch-queue/tests/index.test.ts

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { redisTest } from "@internal/testcontainers";
22
import { describe, expect, vi } from "vitest";
33
import { BatchQueue } from "../index.js";
4+
import type { GlobalRateLimiter } from "@trigger.dev/redis-worker";
45
import type { CompleteBatchResult, InitializeBatchOptions, BatchItem } from "../types.js";
56

67
vi.setConfig({ testTimeout: 60_000 });
@@ -658,4 +659,121 @@ describe("BatchQueue", () => {
658659
}
659660
);
660661
});
662+
663+
describe("global rate limiter at worker queue consumer level", () => {
664+
redisTest(
665+
"should call rate limiter once per item processed",
666+
async ({ redisContainer }) => {
667+
let limitCallCount = 0;
668+
const rateLimiter: GlobalRateLimiter = {
669+
async limit() {
670+
limitCallCount++;
671+
return { allowed: true };
672+
},
673+
};
674+
675+
const queue = new BatchQueue({
676+
redis: {
677+
host: redisContainer.getHost(),
678+
port: redisContainer.getPort(),
679+
keyPrefix: "test:",
680+
},
681+
drr: { quantum: 5, maxDeficit: 50 },
682+
consumerCount: 1,
683+
consumerIntervalMs: 50,
684+
startConsumers: true,
685+
globalRateLimiter: rateLimiter,
686+
});
687+
688+
let completionResult: CompleteBatchResult | null = null;
689+
690+
try {
691+
queue.onProcessItem(async ({ itemIndex }) => {
692+
return { success: true, runId: `run_${itemIndex}` };
693+
});
694+
695+
queue.onBatchComplete(async (result) => {
696+
completionResult = result;
697+
});
698+
699+
const itemCount = 5;
700+
await queue.initializeBatch(createInitOptions("batch1", "env1", itemCount));
701+
await enqueueItems(queue, "batch1", "env1", createBatchItems(itemCount));
702+
703+
await vi.waitFor(
704+
() => {
705+
expect(completionResult).not.toBeNull();
706+
},
707+
{ timeout: 10000 }
708+
);
709+
710+
expect(completionResult!.successfulRunCount).toBe(itemCount);
711+
// Rate limiter should be called at least once per item processed
712+
// (may be called more due to consumer loop iterations with empty pops)
713+
expect(limitCallCount).toBeGreaterThanOrEqual(itemCount);
714+
} finally {
715+
await queue.close();
716+
}
717+
}
718+
);
719+
720+
redisTest(
721+
"should delay processing when rate limited",
722+
async ({ redisContainer }) => {
723+
let limitCallCount = 0;
724+
const rateLimiter: GlobalRateLimiter = {
725+
async limit() {
726+
limitCallCount++;
727+
// Rate limit the first 3 calls, then allow
728+
if (limitCallCount <= 3) {
729+
return { allowed: false, resetAt: Date.now() + 100 };
730+
}
731+
return { allowed: true };
732+
},
733+
};
734+
735+
const queue = new BatchQueue({
736+
redis: {
737+
host: redisContainer.getHost(),
738+
port: redisContainer.getPort(),
739+
keyPrefix: "test:",
740+
},
741+
drr: { quantum: 5, maxDeficit: 50 },
742+
consumerCount: 1,
743+
consumerIntervalMs: 50,
744+
startConsumers: true,
745+
globalRateLimiter: rateLimiter,
746+
});
747+
748+
let completionResult: CompleteBatchResult | null = null;
749+
750+
try {
751+
queue.onProcessItem(async ({ itemIndex }) => {
752+
return { success: true, runId: `run_${itemIndex}` };
753+
});
754+
755+
queue.onBatchComplete(async (result) => {
756+
completionResult = result;
757+
});
758+
759+
await queue.initializeBatch(createInitOptions("batch1", "env1", 3));
760+
await enqueueItems(queue, "batch1", "env1", createBatchItems(3));
761+
762+
// Should still complete despite initial rate limiting
763+
await vi.waitFor(
764+
() => {
765+
expect(completionResult).not.toBeNull();
766+
},
767+
{ timeout: 10000 }
768+
);
769+
770+
expect(completionResult!.successfulRunCount).toBe(3);
771+
// Rate limiter was called more times than items due to initial rejections
772+
expect(limitCallCount).toBeGreaterThan(3);
773+
} finally {
774+
await queue.close();
775+
}
776+
}
777+
);
778+
});
661779
});

internal-packages/run-engine/src/batch-queue/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,15 @@ export type BatchQueueOptions = {
213213
/**
214214
* Optional global rate limiter to limit processing across all consumers.
215215
* When configured, limits the max items/second processed globally.
216+
* Rate limiting happens at the worker queue consumer level (1 token per item).
216217
*/
217218
globalRateLimiter?: GlobalRateLimiter;
219+
/**
220+
* Maximum number of items allowed in the worker queue before claiming pauses.
221+
* Prevents unbounded worker queue growth which could cause visibility timeouts.
222+
* Disabled by default (undefined = no limit).
223+
*/
224+
workerQueueMaxDepth?: number;
218225
/** Logger instance */
219226
logger?: Logger;
220227
logLevel?: LogLevel;

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ export class RunEngine {
398398
consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100,
399399
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
400400
globalRateLimiter: options.batchQueue?.globalRateLimiter,
401+
workerQueueMaxDepth: options.batchQueue?.workerQueueMaxDepth,
401402
startConsumers: startBatchQueueConsumers,
402403
retry: options.batchQueue?.retry,
403404
tracer: options.tracer,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ export type RunEngineOptions = {
109109
defaultConcurrency?: number;
110110
/** Optional global rate limiter to limit processing across all consumers */
111111
globalRateLimiter?: GlobalRateLimiter;
112+
/** Maximum worker queue depth before claiming pauses (protects visibility timeouts) */
113+
workerQueueMaxDepth?: number;
112114
/** Retry configuration for failed batch items */
113115
retry?: {
114116
/** Maximum number of attempts (including the first). Default: 1 (no retries) */

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import type {
2424
FairQueueKeyProducer,
2525
FairQueueOptions,
2626
FairScheduler,
27-
GlobalRateLimiter,
2827
QueueCooloffState,
2928
QueueDescriptor,
3029
SchedulerContext,
@@ -97,8 +96,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
9796
private maxCooloffStatesSize: number;
9897
private queueCooloffStates = new Map<string, QueueCooloffState>();
9998

100-
// Global rate limiter
101-
private globalRateLimiter?: GlobalRateLimiter;
99+
// Worker queue backpressure
100+
private workerQueueMaxDepth: number;
101+
private workerQueueDepthCheckId?: string;
102102

103103
// Consumer tracing
104104
private consumerTraceMaxIterations: number;
@@ -152,8 +152,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
152152
this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000;
153153
this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000;
154154

155-
// Global rate limiter
156-
this.globalRateLimiter = options.globalRateLimiter;
155+
// Worker queue backpressure
156+
this.workerQueueMaxDepth = options.workerQueueMaxDepth ?? 0;
157+
this.workerQueueDepthCheckId = options.workerQueueDepthCheckId;
157158

158159
// Consumer tracing
159160
this.consumerTraceMaxIterations = options.consumerTraceMaxIterations ?? 500;
@@ -1110,15 +1111,13 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11101111
maxClaimCount = Math.min(maxClaimCount, availableCapacity);
11111112
}
11121113

1113-
// Check global rate limit - wait if rate limited
1114-
if (this.globalRateLimiter) {
1115-
const result = await this.globalRateLimiter.limit();
1116-
if (!result.allowed && result.resetAt) {
1117-
const waitMs = Math.max(0, result.resetAt - Date.now());
1118-
if (waitMs > 0) {
1119-
this.logger.debug("Global rate limit reached, waiting", { waitMs, loopId });
1120-
await new Promise((resolve) => setTimeout(resolve, waitMs));
1121-
}
1114+
// Check worker queue depth to prevent unbounded growth.
1115+
// Messages in the worker queue are already in-flight with a visibility timeout.
1116+
// If the queue is too deep, consumers can't keep up, and messages risk timing out.
1117+
if (this.workerQueueMaxDepth > 0 && this.workerQueueDepthCheckId) {
1118+
const depth = await this.workerQueueManager.getLength(this.workerQueueDepthCheckId);
1119+
if (depth >= this.workerQueueMaxDepth) {
1120+
return 0;
11221121
}
11231122
}
11241123

packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1282,4 +1282,76 @@ describe("FairQueue", () => {
12821282
);
12831283
});
12841284

1285+
describe("worker queue depth cap", () => {
1286+
redisTest(
1287+
"should respect worker queue max depth and resume after draining",
1288+
{ timeout: 30000 },
1289+
async ({ redisOptions }) => {
1290+
const processed: string[] = [];
1291+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
1292+
1293+
const scheduler = new DRRScheduler({
1294+
redis: redisOptions,
1295+
keys,
1296+
quantum: 100,
1297+
maxDeficit: 1000,
1298+
});
1299+
1300+
const workerQueueManager = new WorkerQueueManager({
1301+
redis: redisOptions,
1302+
keys,
1303+
});
1304+
1305+
// Create FairQueue with a small depth cap
1306+
const maxDepth = 3;
1307+
const queue = new TestFairQueueHelper(redisOptions, keys, {
1308+
scheduler,
1309+
payloadSchema: TestPayloadSchema,
1310+
shardCount: 1,
1311+
consumerCount: 1,
1312+
consumerIntervalMs: 50,
1313+
visibilityTimeoutMs: 30000,
1314+
workerQueueMaxDepth: maxDepth,
1315+
workerQueueDepthCheckId: TEST_WORKER_QUEUE_ID,
1316+
startConsumers: false,
1317+
});
1318+
1319+
// Use a slow handler to let the worker queue build up
1320+
queue.onMessage(async (ctx) => {
1321+
await new Promise((resolve) => setTimeout(resolve, 200));
1322+
processed.push(ctx.message.payload.value);
1323+
await ctx.complete();
1324+
});
1325+
1326+
// Enqueue 10 messages
1327+
const totalMessages = 10;
1328+
for (let i = 0; i < totalMessages; i++) {
1329+
await queue.enqueue({
1330+
queueId: "tenant:t1:queue:q1",
1331+
tenantId: "t1",
1332+
payload: { value: `msg-${i}` },
1333+
});
1334+
}
1335+
1336+
// Start processing
1337+
queue.start();
1338+
1339+
// Verify all messages eventually get processed (depth cap doesn't permanently block)
1340+
await vi.waitFor(
1341+
() => {
1342+
expect(processed.length).toBe(totalMessages);
1343+
},
1344+
{ timeout: 25000 }
1345+
);
1346+
1347+
// Verify the worker queue is drained
1348+
const finalDepth = await workerQueueManager.getLength(TEST_WORKER_QUEUE_ID);
1349+
expect(finalDepth).toBe(0);
1350+
1351+
await workerQueueManager.close();
1352+
await queue.close();
1353+
}
1354+
);
1355+
});
1356+
12851357
});

0 commit comments

Comments
 (0)