Skip to content

Commit b4b4c0f

Browse files
committed
fix(batch-queue): allow batch queue consumers to run independently from the run engine worker
new environment variable `BATCH_QUEUE_WORKER_ENABLED` now can be used independently from `RUN_ENGINE_WORKER_ENABLED`
1 parent 8716752 commit b4b4c0f

File tree

4 files changed

+38
-35
lines changed

4 files changed

+38
-35
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,7 @@ const EnvironmentSchema = z
952952
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
953953
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
954954
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
955+
BATCH_QUEUE_WORKER_ENABLED: BoolEnv.default(true),
955956
// Number of master queue shards for horizontal scaling
956957
BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1),
957958
// Maximum queues to fetch from master queue per iteration

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ function createRunEngine() {
179179
: undefined,
180180
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
181181
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
182+
consumerEnabled: env.BATCH_QUEUE_WORKER_ENABLED,
182183
// Default processing concurrency when no specific limit is set
183184
// This is overridden per-batch based on the plan type at batch creation
184185
defaultConcurrency: env.BATCH_CONCURRENCY_LIMIT_DEFAULT,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -202,9 +202,9 @@ export class RunEngine {
202202
id: payload.waitpointId,
203203
output: payload.error
204204
? {
205-
value: payload.error,
206-
isError: true,
207-
}
205+
value: payload.error,
206+
isError: true,
207+
}
208208
: undefined,
209209
});
210210
},
@@ -329,8 +329,8 @@ export class RunEngine {
329329
});
330330

331331
// Initialize BatchQueue for DRR-based batch processing (if configured)
332-
// Only start consumers if worker is not disabled (same as main worker)
333-
const startConsumers = !options.worker.disabled;
332+
// Only start consumers if consumerDisabled is not set or is false
333+
const startBatchQueueConsumers = options.batchQueue?.consumerEnabled ?? true;
334334

335335
this.batchQueue = new BatchQueue({
336336
redis: {
@@ -348,7 +348,7 @@ export class RunEngine {
348348
consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100,
349349
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
350350
globalRateLimiter: options.batchQueue?.globalRateLimiter,
351-
startConsumers,
351+
startConsumers: startBatchQueueConsumers,
352352
tracer: options.tracer,
353353
meter: options.meter,
354354
});
@@ -357,7 +357,7 @@ export class RunEngine {
357357
consumerCount: options.batchQueue?.consumerCount ?? 2,
358358
drrQuantum: options.batchQueue?.drr?.quantum ?? 5,
359359
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
360-
consumersEnabled: startConsumers,
360+
consumersEnabled: startBatchQueueConsumers,
361361
});
362362

363363
this.runAttemptSystem = new RunAttemptSystem({
@@ -464,18 +464,18 @@ export class RunEngine {
464464
debounce:
465465
debounce.mode === "trailing"
466466
? {
467-
...debounce,
468-
updateData: {
469-
payload,
470-
payloadType,
471-
metadata,
472-
metadataType,
473-
tags,
474-
maxAttempts,
475-
maxDurationInSeconds,
476-
machine,
477-
},
478-
}
467+
...debounce,
468+
updateData: {
469+
payload,
470+
payloadType,
471+
metadata,
472+
metadataType,
473+
tags,
474+
maxAttempts,
475+
maxDurationInSeconds,
476+
machine,
477+
},
478+
}
479479
: debounce,
480480
tx: prisma,
481481
});
@@ -574,8 +574,8 @@ export class RunEngine {
574574
tags.length === 0
575575
? undefined
576576
: {
577-
connect: tags,
578-
},
577+
connect: tags,
578+
},
579579
runTags: tags.length === 0 ? undefined : tags.map((tag) => tag.name),
580580
oneTimeUseToken,
581581
parentTaskRunId,
@@ -598,10 +598,10 @@ export class RunEngine {
598598
realtimeStreamsVersion,
599599
debounce: debounce
600600
? {
601-
key: debounce.key,
602-
delay: debounce.delay,
603-
createdAt: new Date(),
604-
}
601+
key: debounce.key,
602+
delay: debounce.delay,
603+
createdAt: new Date(),
604+
}
605605
: undefined,
606606
executionSnapshots: {
607607
create: {
@@ -1750,17 +1750,17 @@ export class RunEngine {
17501750
const error =
17511751
latestSnapshot.environmentType === "DEVELOPMENT"
17521752
? ({
1753-
type: "INTERNAL_ERROR",
1754-
code: taskStalledErrorCode,
1755-
message: errorMessage,
1756-
} satisfies TaskRunInternalError)
1753+
type: "INTERNAL_ERROR",
1754+
code: taskStalledErrorCode,
1755+
message: errorMessage,
1756+
} satisfies TaskRunInternalError)
17571757
: this.options.treatProductionExecutionStallsAsOOM
1758-
? ({
1758+
? ({
17591759
type: "INTERNAL_ERROR",
17601760
code: "TASK_PROCESS_OOM_KILLED",
17611761
message: "Run was terminated due to running out of memory",
17621762
} satisfies TaskRunInternalError)
1763-
: ({
1763+
: ({
17641764
type: "INTERNAL_ERROR",
17651765
code: taskStalledErrorCode,
17661766
message: errorMessage,
@@ -1775,10 +1775,10 @@ export class RunEngine {
17751775
error,
17761776
retry: shouldRetry
17771777
? {
1778-
//250ms in the future
1779-
timestamp: Date.now() + retryDelay,
1780-
delay: retryDelay,
1781-
}
1778+
//250ms in the future
1779+
timestamp: Date.now() + retryDelay,
1780+
delay: retryDelay,
1781+
}
17821782
: undefined,
17831783
},
17841784
forceRequeue: true,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ export type RunEngineOptions = {
8080
shardCount?: number;
8181
/** Worker queue blocking timeout in seconds (enables two-stage processing) */
8282
workerQueueBlockingTimeoutSeconds?: number;
83+
consumerEnabled?: boolean;
8384
consumerCount?: number;
8485
consumerIntervalMs?: number;
8586
/** Default processing concurrency per environment when no specific limit is set */

0 commit comments

Comments
 (0)