Skip to content

Commit 38c4cd2

Browse files
committed
some fixes
1 parent c7bfe5c commit 38c4cd2

File tree

6 files changed

+119
-31
lines changed

6 files changed

+119
-31
lines changed

apps/webapp/app/runEngine/services/batchTriggerV2.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ export class RunEngineBatchTriggerServiceV2 extends WithRunEngine {
122122
const batchItems: BatchItem[] = body.items.map((item) => ({
123123
task: item.task,
124124
payload: item.payload,
125-
payloadType: (item as { payloadType?: string }).payloadType,
125+
payloadType: item.options?.payloadType ?? "application/json",
126126
options: item.options as Record<string, unknown> | undefined,
127127
}));
128128

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -160,23 +160,26 @@ function createRunEngine() {
160160
},
161161
// BatchQueue with DRR scheduling for fair batch processing
162162
// Consumers are controlled by options.worker.disabled (same as main worker)
163-
batchQueue: env.BATCH_TRIGGER_WORKER_ENABLED === "true" ? {
164-
redis: {
165-
keyPrefix: "engine:",
166-
port: env.BATCH_TRIGGER_WORKER_REDIS_PORT ?? undefined,
167-
host: env.BATCH_TRIGGER_WORKER_REDIS_HOST ?? undefined,
168-
username: env.BATCH_TRIGGER_WORKER_REDIS_USERNAME ?? undefined,
169-
password: env.BATCH_TRIGGER_WORKER_REDIS_PASSWORD ?? undefined,
170-
enableAutoPipelining: true,
171-
...(env.BATCH_TRIGGER_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
172-
},
173-
drr: {
174-
quantum: env.BATCH_QUEUE_DRR_QUANTUM,
175-
maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT,
176-
},
177-
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
178-
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
179-
} : undefined,
163+
batchQueue:
164+
env.BATCH_TRIGGER_WORKER_ENABLED === "true"
165+
? {
166+
redis: {
167+
keyPrefix: "engine:",
168+
port: env.BATCH_TRIGGER_WORKER_REDIS_PORT ?? undefined,
169+
host: env.BATCH_TRIGGER_WORKER_REDIS_HOST ?? undefined,
170+
username: env.BATCH_TRIGGER_WORKER_REDIS_USERNAME ?? undefined,
171+
password: env.BATCH_TRIGGER_WORKER_REDIS_PASSWORD ?? undefined,
172+
enableAutoPipelining: true,
173+
...(env.BATCH_TRIGGER_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
174+
},
175+
drr: {
176+
quantum: env.BATCH_QUEUE_DRR_QUANTUM,
177+
maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT,
178+
},
179+
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
180+
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
181+
}
182+
: undefined,
180183
});
181184

182185
// Set up BatchQueue callbacks if enabled
@@ -187,6 +190,31 @@ function createRunEngine() {
187190
return engine;
188191
}
189192

193+
/**
194+
* Normalize the payload from BatchQueue.
195+
* The payload might be a JSON string if the SDK sent it pre-serialized.
196+
* If it's a JSON string and payloadType is "application/json", parse it
197+
* to avoid double-stringification in DefaultPayloadProcessor.
198+
*/
199+
function normalizePayload(payload: unknown, payloadType?: string): unknown {
200+
// Only normalize for JSON payloads
201+
if (payloadType !== "application/json" && payloadType !== undefined) {
202+
return payload;
203+
}
204+
205+
// If payload is a string, try to parse it as JSON
206+
if (typeof payload === "string") {
207+
try {
208+
return JSON.parse(payload);
209+
} catch {
210+
// If it's not valid JSON, return as-is
211+
return payload;
212+
}
213+
}
214+
215+
return payload;
216+
}
217+
190218
/**
191219
* Set up the BatchQueue processing callbacks.
192220
* These handle creating runs from batch items and completing batches.
@@ -197,6 +225,9 @@ function setupBatchQueueCallbacks(engine: RunEngine) {
197225
try {
198226
const triggerTaskService = new TriggerTaskService();
199227

228+
// Normalize payload to avoid double-stringification
229+
const payload = normalizePayload(item.payload, item.payloadType);
230+
200231
const result = await triggerTaskService.call(
201232
item.task,
202233
{
@@ -208,7 +239,7 @@ function setupBatchQueueCallbacks(engine: RunEngine) {
208239
project: { id: meta.projectId },
209240
} as AuthenticatedEnvironment,
210241
{
211-
payload: item.payload,
242+
payload,
212243
options: {
213244
...(item.options as Record<string, unknown>),
214245
payloadType: item.payloadType,

internal-packages/run-engine/src/batch-queue/drrScheduler.ts

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,30 @@ export class DRRScheduler {
174174
}
175175

176176
/**
177-
* Record a successful run for a batch.
177+
* Record a successful run for a batch and increment processed count.
178+
* Returns the new processed count.
178179
*/
179-
async recordSuccess(batchId: string, runId: string): Promise<void> {
180+
async recordSuccess(batchId: string, runId: string): Promise<number> {
180181
const runsKey = this.keys.batchRunsKey(batchId);
181-
await this.redis.rpush(runsKey, runId);
182+
const processedKey = this.keys.batchProcessedCountKey(batchId);
183+
184+
// Use a pipeline to atomically record and increment
185+
const pipeline = this.redis.pipeline();
186+
pipeline.rpush(runsKey, runId);
187+
pipeline.incr(processedKey);
188+
189+
const results = await pipeline.exec();
190+
// incr result is the second command, returns [error, value]
191+
const incrResult = results?.[1];
192+
if (incrResult?.[0]) {
193+
throw incrResult[0];
194+
}
195+
return incrResult?.[1] as number;
182196
}
183197

184198
/**
185-
* Record a failure for a batch item.
199+
* Record a failure for a batch item and increment processed count.
200+
* Returns the new processed count.
186201
*/
187202
async recordFailure(
188203
batchId: string,
@@ -194,13 +209,26 @@ export class DRRScheduler {
194209
error: string;
195210
errorCode?: string;
196211
}
197-
): Promise<void> {
212+
): Promise<number> {
198213
const failuresKey = this.keys.batchFailuresKey(batchId);
214+
const processedKey = this.keys.batchProcessedCountKey(batchId);
199215
const failureRecord = {
200216
...failure,
201217
timestamp: Date.now(),
202218
};
203-
await this.redis.rpush(failuresKey, JSON.stringify(failureRecord));
219+
220+
// Use a pipeline to atomically record and increment
221+
const pipeline = this.redis.pipeline();
222+
pipeline.rpush(failuresKey, JSON.stringify(failureRecord));
223+
pipeline.incr(processedKey);
224+
225+
const results = await pipeline.exec();
226+
// incr result is the second command, returns [error, value]
227+
const incrResult = results?.[1];
228+
if (incrResult?.[0]) {
229+
throw incrResult[0];
230+
}
231+
return incrResult?.[1] as number;
204232
}
205233

206234
/**
@@ -240,6 +268,7 @@ export class DRRScheduler {
240268
this.keys.batchMetaKey(batchId),
241269
this.keys.batchRunsKey(batchId),
242270
this.keys.batchFailuresKey(batchId),
271+
this.keys.batchProcessedCountKey(batchId),
243272
];
244273
await this.redis.del(...keys);
245274
}

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,8 @@ export class BatchQueue {
278278

279279
/**
280280
* Process a single dequeued item.
281+
* Uses processed count (not isBatchComplete from dequeue) to determine when to finalize.
282+
* This prevents race conditions when multiple consumers process items concurrently.
281283
*/
282284
async #processItem(dequeued: {
283285
envId: string;
@@ -288,13 +290,15 @@ export class BatchQueue {
288290
isBatchComplete: boolean;
289291
envHasMoreBatches: boolean;
290292
}): Promise<void> {
291-
const { batchId, itemIndex, item, meta, isBatchComplete } = dequeued;
293+
const { batchId, itemIndex, item, meta } = dequeued;
292294

293295
if (!this.processItemCallback) {
294296
this.logger.error("No process item callback set", { batchId, itemIndex });
295297
return;
296298
}
297299

300+
let processedCount: number;
301+
298302
try {
299303
const result = await this.processItemCallback({
300304
batchId,
@@ -305,16 +309,18 @@ export class BatchQueue {
305309
});
306310

307311
if (result.success) {
308-
await this.scheduler.recordSuccess(batchId, result.runId);
312+
processedCount = await this.scheduler.recordSuccess(batchId, result.runId);
309313
this.logger.debug("Batch item processed successfully", {
310314
batchId,
311315
itemIndex,
312316
runId: result.runId,
317+
processedCount,
318+
expectedCount: meta.runCount,
313319
});
314320
} else {
315321
const payloadStr =
316322
typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
317-
await this.scheduler.recordFailure(batchId, {
323+
processedCount = await this.scheduler.recordFailure(batchId, {
318324
index: itemIndex,
319325
taskIdentifier: item.task,
320326
payload: payloadStr?.substring(0, 1000), // Truncate large payloads
@@ -326,13 +332,15 @@ export class BatchQueue {
326332
batchId,
327333
itemIndex,
328334
error: result.error,
335+
processedCount,
336+
expectedCount: meta.runCount,
329337
});
330338
}
331339
} catch (error) {
332340
// Unexpected error during processing
333341
const payloadStr =
334342
typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
335-
await this.scheduler.recordFailure(batchId, {
343+
processedCount = await this.scheduler.recordFailure(batchId, {
336344
index: itemIndex,
337345
taskIdentifier: item.task,
338346
payload: payloadStr?.substring(0, 1000),
@@ -344,11 +352,19 @@ export class BatchQueue {
344352
batchId,
345353
itemIndex,
346354
error: error instanceof Error ? error.message : String(error),
355+
processedCount,
356+
expectedCount: meta.runCount,
347357
});
348358
}
349359

350-
// If batch is complete, finalize it
351-
if (isBatchComplete) {
360+
// Check if all items have been processed using atomic counter
361+
// This is safe even with multiple concurrent consumers
362+
if (processedCount === meta.runCount) {
363+
this.logger.debug("All items processed, finalizing batch", {
364+
batchId,
365+
processedCount,
366+
expectedCount: meta.runCount,
367+
});
352368
await this.#finalizeBatch(batchId, meta);
353369
}
354370
}

internal-packages/run-engine/src/batch-queue/keyProducer.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const constants = {
99
QUEUE_PART: "queue",
1010
RUNS_PART: "runs",
1111
FAILURES_PART: "failures",
12+
PROCESSED_COUNT_PART: "processed",
1213
} as const;
1314

1415
/**
@@ -22,6 +23,7 @@ const constants = {
2223
* - batch:{batchId}:meta - Per-batch metadata
2324
* - batch:{batchId}:runs - Per-batch successful runs (list of runIds)
2425
* - batch:{batchId}:failures - Per-batch failures (list of failure JSON)
26+
* - batch:{batchId}:processed - Per-batch processed count (atomic counter)
2527
*/
2628
export class BatchQueueFullKeyProducer implements BatchQueueKeyProducer {
2729
/**
@@ -80,6 +82,14 @@ export class BatchQueueFullKeyProducer implements BatchQueueKeyProducer {
8082
return [constants.BATCH_PART, batchId, constants.FAILURES_PART].join(":");
8183
}
8284

85+
/**
86+
* Key for a batch's processed count.
87+
* Atomic counter tracking how many items have been processed (success or failure).
88+
*/
89+
batchProcessedCountKey(batchId: string): string {
90+
return [constants.BATCH_PART, batchId, constants.PROCESSED_COUNT_PART].join(":");
91+
}
92+
8393
/**
8494
* Create a master queue member value from envId and batchId.
8595
* Format: "{envId}:{batchId}"

internal-packages/run-engine/src/batch-queue/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ export interface BatchQueueKeyProducer {
152152
batchRunsKey(batchId: string): string;
153153
/** Key for a batch's failure list */
154154
batchFailuresKey(batchId: string): string;
155+
/** Key for a batch's processed count (atomic counter) */
156+
batchProcessedCountKey(batchId: string): string;
155157

156158
// Master queue member utilities
157159
/** Create a master queue member value: "{envId}:{batchId}" */

0 commit comments

Comments
 (0)