Skip to content

Commit c8a1df0

Browse files
committed
Add BATCH_QUEUE_WORKER_QUEUE_ENABLED to control which service dequeues from the worker queue
1 parent babb92d commit c8a1df0

File tree

2 files changed

+6
-2
lines changed

2 files changed

+6
-2
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,9 @@ const EnvironmentSchema = z
955955
BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1),
956956
// Maximum queues to fetch from master queue per iteration
957957
BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().default(1000),
958-
// Worker queue blocking timeout in seconds (for two-stage processing)
958+
// Enable worker queue for two-stage processing (claim messages, push to worker queue, process from worker queue)
959+
BATCH_QUEUE_WORKER_QUEUE_ENABLED: BoolEnv.default(true),
960+
// Worker queue blocking timeout in seconds (for two-stage processing, only used when BATCH_QUEUE_WORKER_QUEUE_ENABLED is true)
959961
BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
960962
// Global rate limit: max items processed per second across all consumers
961963
// If not set, no global rate limiting is applied

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ function createRunEngine() {
174174
masterQueueLimit: env.BATCH_QUEUE_MASTER_QUEUE_LIMIT,
175175
},
176176
shardCount: env.BATCH_QUEUE_SHARD_COUNT,
177-
workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS,
177+
workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_ENABLED
178+
? env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS
179+
: undefined,
178180
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
179181
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
180182
// Default processing concurrency when no specific limit is set

0 commit comments

Comments
 (0)