Skip to content

Commit a93b54c

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/ecr-support
2 parents 14cbc22 + 1be4fcb commit a93b54c

File tree

6 files changed

+227
-153
lines changed

6 files changed

+227
-153
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -462,13 +462,13 @@ export class RunsReplicationService {
462462

463463
// Insert task runs and payloads with retry logic for connection errors
464464
const [taskRunError, taskRunResult] = await this.#insertWithRetry(
465-
() => this.#insertTaskRunInserts(taskRunInserts),
465+
(attempt) => this.#insertTaskRunInserts(taskRunInserts, attempt),
466466
"task run inserts",
467467
flushId
468468
);
469469

470470
const [payloadError, payloadResult] = await this.#insertWithRetry(
471-
() => this.#insertPayloadInserts(payloadInserts),
471+
(attempt) => this.#insertPayloadInserts(payloadInserts, attempt),
472472
"payload inserts",
473473
flushId
474474
);
@@ -502,24 +502,25 @@ export class RunsReplicationService {
502502

503503
// New method to handle inserts with retry logic for connection errors
504504
async #insertWithRetry<T>(
505-
insertFn: () => Promise<T>,
505+
insertFn: (attempt: number) => Promise<T>,
506506
operationName: string,
507507
flushId: string
508508
): Promise<[Error | null, T | null]> {
509509
let lastError: Error | null = null;
510510

511511
for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) {
512512
try {
513-
const result = await insertFn();
513+
const result = await insertFn(attempt);
514514
return [null, result];
515515
} catch (error) {
516516
lastError = error instanceof Error ? error : new Error(String(error));
517517

518518
// Check if this is a retryable connection error
519-
if (this.#isRetryableConnectionError(lastError) && attempt < this._insertMaxRetries) {
519+
if (this.#isRetryableConnectionError(lastError)) {
520520
const delay = this.#calculateConnectionRetryDelay(attempt);
521521

522-
this.logger.warn(`Retrying ${operationName} due to connection error`, {
522+
this.logger.warn(`Retrying RunReplication insert due to connection error`, {
523+
operationName,
523524
flushId,
524525
attempt,
525526
maxRetries: this._insertMaxRetries,
@@ -567,7 +568,7 @@ export class RunsReplicationService {
567568
return delay + jitter;
568569
}
569570

570-
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
571+
async #insertTaskRunInserts(taskRunInserts: TaskRunV2[], attempt: number) {
571572
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
572573
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
573574
taskRunInserts,
@@ -581,18 +582,20 @@ export class RunsReplicationService {
581582
);
582583

583584
if (insertError) {
584-
this.logger.error("Error inserting task run inserts", {
585+
this.logger.error("Error inserting task run inserts attempt", {
585586
error: insertError,
587+
attempt,
586588
});
587589

588590
recordSpanError(span, insertError);
591+
throw insertError;
589592
}
590593

591594
return insertResult;
592595
});
593596
}
594597

595-
async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[]) {
598+
async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[], attempt: number) {
596599
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
597600
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
598601
payloadInserts,
@@ -606,11 +609,13 @@ export class RunsReplicationService {
606609
);
607610

608611
if (insertError) {
609-
this.logger.error("Error inserting payload inserts", {
612+
this.logger.error("Error inserting payload inserts attempt", {
610613
error: insertError,
614+
attempt,
611615
});
612616

613617
recordSpanError(span, insertError);
618+
throw insertError;
614619
}
615620

616621
return insertResult;

docs/triggering.mdx

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ Triggers a single run of a task with the payload you pass in, and any options yo
238238
</Note>
239239

240240
```ts ./trigger/my-task.ts
241-
import { myOtherTask, runs } from "~/trigger/my-other-task";
241+
import { runs } from "@trigger.dev/sdk/v3";
242+
import { myOtherTask } from "~/trigger/my-other-task";
242243

243244
export const myTask = task({
244245
id: "my-task",
@@ -254,7 +255,8 @@ export const myTask = task({
254255
To pass options to the triggered task, you can use the second argument:
255256

256257
```ts ./trigger/my-task.ts
257-
import { myOtherTask, runs } from "~/trigger/my-other-task";
258+
import { runs } from "@trigger.dev/sdk/v3";
259+
import { myOtherTask } from "~/trigger/my-other-task";
258260

259261
export const myTask = task({
260262
id: "my-task",
@@ -272,7 +274,8 @@ export const myTask = task({
272274
Triggers multiple runs of a single task with the payloads you pass in, and any options you specify.
273275

274276
```ts /trigger/my-task.ts
275-
import { myOtherTask, batch } from "~/trigger/my-other-task";
277+
import { batch } from "@trigger.dev/sdk/v3";
278+
import { myOtherTask } from "~/trigger/my-other-task";
276279

277280
export const myTask = task({
278281
id: "my-task",
@@ -288,7 +291,8 @@ export const myTask = task({
288291
If you need to pass options to `batchTrigger`, you can use the second argument:
289292

290293
```ts /trigger/my-task.ts
291-
import { myOtherTask, batch } from "~/trigger/my-other-task";
294+
import { batch } from "@trigger.dev/sdk/v3";
295+
import { myOtherTask } from "~/trigger/my-other-task";
292296

293297
export const myTask = task({
294298
id: "my-task",
@@ -306,7 +310,8 @@ export const myTask = task({
306310
You can also pass in options for each run in the batch:
307311

308312
```ts /trigger/my-task.ts
309-
import { myOtherTask, batch } from "~/trigger/my-other-task";
313+
import { batch } from "@trigger.dev/sdk/v3";
314+
import { myOtherTask } from "~/trigger/my-other-task";
310315

311316
export const myTask = task({
312317
id: "my-task",

0 commit comments

Comments
 (0)