@@ -15,6 +15,8 @@ import { TaskRun } from "@trigger.dev/database";
1515import { nanoid } from "nanoid" ;
1616import EventEmitter from "node:events" ;
1717import pLimit from "p-limit" ;
18+ import { logger } from "./logger.server" ;
19+ import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings" ;
1820
1921interface TransactionEvent < T = any > {
2022 tag : "insert" | "update" | "delete" ;
@@ -51,6 +53,10 @@ export type RunsReplicationServiceOptions = {
5153 logLevel ?: LogLevel ;
5254 tracer ?: Tracer ;
5355 waitForAsyncInsert ?: boolean ;
56+ // Retry configuration for insert operations
57+ insertMaxRetries ?: number ;
58+ insertBaseDelayMs ?: number ;
59+ insertMaxDelayMs ?: number ;
5460} ;
5561
5662type TaskRunInsert = { _version : bigint ; run : TaskRun ; event : "insert" | "update" | "delete" } ;
@@ -80,6 +86,10 @@ export class RunsReplicationService {
8086 private _latestCommitEndLsn : string | null = null ;
8187 private _lastAcknowledgedLsn : string | null = null ;
8288 private _acknowledgeInterval : NodeJS . Timeout | null = null ;
89+ // Retry configuration
90+ private _insertMaxRetries : number ;
91+ private _insertBaseDelayMs : number ;
92+ private _insertMaxDelayMs : number ;
8393
8494 public readonly events : EventEmitter < RunsReplicationServiceEvents > ;
8595
@@ -151,6 +161,11 @@ export class RunsReplicationService {
151161 this . _replicationClient . events . on ( "leaderElection" , ( isLeader ) => {
152162 this . logger . info ( "Leader election" , { isLeader } ) ;
153163 } ) ;
164+
165+ // Initialize retry configuration
166+ this . _insertMaxRetries = options . insertMaxRetries ?? 3 ;
167+ this . _insertBaseDelayMs = options . insertBaseDelayMs ?? 100 ;
168+ this . _insertMaxDelayMs = options . insertMaxDelayMs ?? 2000 ;
154169 }
155170
156171 public async shutdown ( ) {
@@ -445,8 +460,37 @@ export class RunsReplicationService {
445460 payloadInserts : payloadInserts . length ,
446461 } ) ;
447462
448- await this . #insertTaskRunInserts( taskRunInserts ) ;
449- await this . #insertPayloadInserts( payloadInserts ) ;
463+ // Insert task runs and payloads with retry logic for connection errors
464+ const [ taskRunError , taskRunResult ] = await this . #insertWithRetry(
465+ ( ) => this . #insertTaskRunInserts( taskRunInserts ) ,
466+ "task run inserts" ,
467+ flushId
468+ ) ;
469+
470+ const [ payloadError , payloadResult ] = await this . #insertWithRetry(
471+ ( ) => this . #insertPayloadInserts( payloadInserts ) ,
472+ "payload inserts" ,
473+ flushId
474+ ) ;
475+
476+ // Log any errors that occurred
477+ if ( taskRunError ) {
478+ this . logger . error ( "Error inserting task run inserts" , {
479+ error : taskRunError ,
480+ flushId,
481+ runIds : taskRunInserts . map ( ( r ) => r . run_id ) ,
482+ } ) ;
483+ recordSpanError ( span , taskRunError ) ;
484+ }
485+
486+ if ( payloadError ) {
487+ this . logger . error ( "Error inserting payload inserts" , {
488+ error : payloadError ,
489+ flushId,
490+ runIds : payloadInserts . map ( ( r ) => r . run_id ) ,
491+ } ) ;
492+ recordSpanError ( span , payloadError ) ;
493+ }
450494
451495 this . logger . debug ( "Flushed inserts" , {
452496 flushId,
@@ -456,6 +500,73 @@ export class RunsReplicationService {
456500 } ) ;
457501 }
458502
503+ // New method to handle inserts with retry logic for connection errors
504+ async #insertWithRetry< T > (
505+ insertFn : ( ) => Promise < T > ,
506+ operationName : string ,
507+ flushId : string
508+ ) : Promise < [ Error | null , T | null ] > {
509+ let lastError : Error | null = null ;
510+
511+ for ( let attempt = 1 ; attempt <= this . _insertMaxRetries ; attempt ++ ) {
512+ try {
513+ const result = await insertFn ( ) ;
514+ return [ null , result ] ;
515+ } catch ( error ) {
516+ lastError = error instanceof Error ? error : new Error ( String ( error ) ) ;
517+
518+ // Check if this is a retryable connection error
519+ if ( this . #isRetryableConnectionError( lastError ) && attempt < this . _insertMaxRetries ) {
520+ const delay = this . #calculateConnectionRetryDelay( attempt ) ;
521+
522+ this . logger . warn ( `Retrying ${ operationName } due to connection error` , {
523+ flushId,
524+ attempt,
525+ maxRetries : this . _insertMaxRetries ,
526+ error : lastError . message ,
527+ delay,
528+ } ) ;
529+
530+ await new Promise ( ( resolve ) => setTimeout ( resolve , delay ) ) ;
531+ continue ;
532+ }
533+ break ;
534+ }
535+ }
536+
537+ return [ lastError , null ] ;
538+ }
539+
540+ // New method to check if an error is a retryable connection error
541+ #isRetryableConnectionError( error : Error ) : boolean {
542+ const errorMessage = error . message . toLowerCase ( ) ;
543+ const retryableConnectionPatterns = [
544+ "socket hang up" ,
545+ "econnreset" ,
546+ "connection reset" ,
547+ "connection refused" ,
548+ "connection timeout" ,
549+ "network error" ,
550+ "read econnreset" ,
551+ "write econnreset" ,
552+ ] ;
553+
554+ return retryableConnectionPatterns . some ( ( pattern ) => errorMessage . includes ( pattern ) ) ;
555+ }
556+
557+ // New method to calculate retry delay for connection errors
558+ #calculateConnectionRetryDelay( attempt : number ) : number {
559+ // Exponential backoff: baseDelay, baseDelay*2, baseDelay*4, etc.
560+ const delay = Math . min (
561+ this . _insertBaseDelayMs * Math . pow ( 2 , attempt - 1 ) ,
562+ this . _insertMaxDelayMs
563+ ) ;
564+
565+ // Add some jitter to prevent thundering herd
566+ const jitter = Math . random ( ) * 100 ;
567+ return delay + jitter ;
568+ }
569+
459570 async #insertTaskRunInserts( taskRunInserts : TaskRunV2 [ ] ) {
460571 return await startSpan ( this . _tracer , "insertTaskRunsInserts" , async ( span ) => {
461572 const [ insertError , insertResult ] = await this . options . clickhouse . taskRuns . insert (
@@ -604,6 +715,7 @@ export class RunsReplicationService {
604715 idempotency_key : run . idempotencyKey ?? "" ,
605716 expiration_ttl : run . ttl ?? "" ,
606717 output,
718+ concurrency_key : run . concurrencyKey ?? "" ,
607719 _version : _version . toString ( ) ,
608720 _is_deleted : event === "delete" ? 1 : 0 ,
609721 } ;
@@ -631,6 +743,14 @@ export class RunsReplicationService {
631743 return { data : undefined } ;
632744 }
633745
746+ if ( detectBadJsonStrings ( data ) ) {
747+ this . logger . warn ( "Detected bad JSON strings" , {
748+ data,
749+ dataType,
750+ } ) ;
751+ return { data : undefined } ;
752+ }
753+
634754 const packet = {
635755 data,
636756 dataType,
0 commit comments