Skip to content

Commit 5cefaf7

Browse files
committed
Further RunLock improvements
1 parent a056306 commit 5cefaf7

File tree

6 files changed

+318
-20
lines changed

6 files changed

+318
-20
lines changed

apps/webapp/app/env.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,15 @@ const EnvironmentSchema = z.object({
429429
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
430430
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
431431

432+
RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
433+
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
434+
RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10),
435+
RUN_ENGINE_RUN_LOCK_BASE_DELAY: z.coerce.number().int().default(100),
436+
RUN_ENGINE_RUN_LOCK_MAX_DELAY: z.coerce.number().int().default(3000),
437+
RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER: z.coerce.number().default(1.8),
438+
RUN_ENGINE_RUN_LOCK_JITTER_FACTOR: z.coerce.number().default(0.15),
439+
RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME: z.coerce.number().int().default(15000),
440+
432441
RUN_ENGINE_WORKER_REDIS_HOST: z
433442
.string()
434443
.optional()

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,16 @@ function createRunEngine() {
7575
enableAutoPipelining: true,
7676
...(env.RUN_ENGINE_RUN_LOCK_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
7777
},
78+
duration: env.RUN_ENGINE_RUN_LOCK_DURATION,
79+
automaticExtensionThreshold: env.RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD,
80+
retryConfig: {
81+
maxRetries: env.RUN_ENGINE_RUN_LOCK_MAX_RETRIES,
82+
baseDelay: env.RUN_ENGINE_RUN_LOCK_BASE_DELAY,
83+
maxDelay: env.RUN_ENGINE_RUN_LOCK_MAX_DELAY,
84+
backoffMultiplier: env.RUN_ENGINE_RUN_LOCK_BACKOFF_MULTIPLIER,
85+
jitterFactor: env.RUN_ENGINE_RUN_LOCK_JITTER_FACTOR,
86+
maxTotalWaitTime: env.RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME,
87+
},
7888
},
7989
tracer,
8090
meter,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ export class RunEngine {
9898
logger: this.logger,
9999
tracer: trace.getTracer("RunLocker"),
100100
meter: options.meter,
101+
defaultDuration: options.runLock.duration ?? 5000,
102+
automaticExtensionThreshold: options.runLock.automaticExtensionThreshold ?? 1000,
103+
retryConfig: {
104+
maxRetries: 10,
105+
baseDelay: 100,
106+
maxDelay: 3000,
107+
backoffMultiplier: 1.8,
108+
jitterFactor: 0.15,
109+
maxTotalWaitTime: 15000,
110+
...options.runLock.retryConfig,
111+
},
101112
});
102113

103114
const keys = new RunQueueFullKeyProducer();

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 114 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ interface ManualLockContext {
5151
extension: Promise<void> | undefined;
5252
}
5353

54-
interface RetryConfig {
54+
export interface LockRetryConfig {
5555
/** Maximum number of retry attempts (default: 10) */
5656
maxRetries?: number;
5757
/** Initial delay in milliseconds (default: 200) */
@@ -66,6 +66,15 @@ interface RetryConfig {
6666
maxTotalWaitTime?: number;
6767
}
6868

69+
interface LockOptions {
70+
/** Default lock duration in milliseconds (default: 5000) */
71+
defaultDuration?: number;
72+
/** Automatic extension threshold in milliseconds - how early to extend locks before expiration (default: 500) */
73+
automaticExtensionThreshold?: number;
74+
/** Retry configuration for lock acquisition */
75+
retryConfig?: LockRetryConfig;
76+
}
77+
6978
export class RunLocker {
7079
private redlock: InstanceType<typeof redlock.default>;
7180
private asyncLocalStorage: AsyncLocalStorage<LockContext>;
@@ -75,21 +84,29 @@ export class RunLocker {
7584
private activeLocks: Map<string, { lockType: string; resources: string[] }> = new Map();
7685
private activeManualContexts: Map<string, ManualLockContext> = new Map();
7786
private lockDurationHistogram: Histogram;
78-
private retryConfig: Required<RetryConfig>;
87+
private retryConfig: Required<LockRetryConfig>;
88+
private defaultDuration: number;
89+
private automaticExtensionThreshold: number;
7990

8091
constructor(options: {
8192
redis: Redis;
8293
logger: Logger;
8394
tracer: Tracer;
8495
meter?: Meter;
85-
retryConfig?: RetryConfig;
96+
defaultDuration?: number;
97+
automaticExtensionThreshold?: number;
98+
retryConfig?: LockRetryConfig;
8699
}) {
100+
// Initialize configuration values
101+
this.defaultDuration = options.defaultDuration ?? 5000;
102+
this.automaticExtensionThreshold = options.automaticExtensionThreshold ?? 500;
103+
87104
this.redlock = new Redlock([options.redis], {
88105
driftFactor: 0.01,
89-
retryCount: 10,
90-
retryDelay: 200, // time in ms
91-
retryJitter: 200, // time in ms
92-
automaticExtensionThreshold: 500, // time in ms
106+
retryCount: 0, // Disable Redlock's internal retrying - we handle retries ourselves
107+
retryDelay: 200, // Not used since retryCount = 0
108+
retryJitter: 200, // Not used since retryCount = 0
109+
automaticExtensionThreshold: this.automaticExtensionThreshold,
93110
});
94111
this.asyncLocalStorage = new AsyncLocalStorage<LockContext>();
95112
this.logger = options.logger;
@@ -143,20 +160,45 @@ export class RunLocker {
143160
async lock<T>(
144161
name: string,
145162
resources: string[],
146-
duration: number,
163+
duration: number | undefined,
147164
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>
165+
): Promise<T>;
166+
async lock<T>(
167+
name: string,
168+
resources: string[],
169+
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>
170+
): Promise<T>;
171+
async lock<T>(
172+
name: string,
173+
resources: string[],
174+
durationOrRoutine: number | undefined | ((signal: redlock.RedlockAbortSignal) => Promise<T>),
175+
routine?: (signal: redlock.RedlockAbortSignal) => Promise<T>
148176
): Promise<T> {
149177
const currentContext = this.asyncLocalStorage.getStore();
150178
const joinedResources = resources.sort().join(",");
151179

180+
// Handle overloaded parameters
181+
let actualDuration: number;
182+
let actualRoutine: (signal: redlock.RedlockAbortSignal) => Promise<T>;
183+
184+
if (typeof durationOrRoutine === "function") {
185+
// Called as lock(name, resources, routine) - use default duration
186+
actualDuration = this.defaultDuration;
187+
actualRoutine = durationOrRoutine;
188+
} else {
189+
// Called as lock(name, resources, duration, routine) - use provided duration
190+
actualDuration = durationOrRoutine ?? this.defaultDuration;
191+
actualRoutine = routine!;
192+
}
193+
152194
return startSpan(
153195
this.tracer,
154196
"RunLocker.lock",
155197
async (span) => {
156198
if (currentContext && currentContext.resources === joinedResources) {
157199
span.setAttribute("nested", true);
158200
// We're already inside a lock with the same resources, just run the routine
159-
return routine(currentContext.signal);
201+
return actualRoutine(currentContext.signal);
160202
}
161203

162204
span.setAttribute("nested", false);
@@ -166,7 +208,14 @@ export class RunLocker {
166208
const lockStartTime = performance.now();
167209

168210
const [error, result] = await tryCatch(
169-
this.#acquireAndExecute(name, resources, duration, routine, lockId, lockStartTime)
211+
this.#acquireAndExecute(
212+
name,
213+
resources,
214+
actualDuration,
215+
actualRoutine,
216+
lockId,
217+
lockStartTime
218+
)
170219
);
171220

172221
if (error) {
@@ -177,14 +226,18 @@ export class RunLocker {
177226
[SemanticAttributes.LOCK_SUCCESS]: "false",
178227
});
179228

180-
this.logger.error("[RunLocker] Error locking resources", { error, resources, duration });
229+
this.logger.error("[RunLocker] Error locking resources", {
230+
error,
231+
resources,
232+
duration: actualDuration,
233+
});
181234
throw error;
182235
}
183236

184237
return result;
185238
},
186239
{
187-
attributes: { name, resources, timeout: duration },
240+
attributes: { name, resources, timeout: actualDuration },
188241
}
189242
);
190243
}
@@ -387,15 +440,14 @@ export class RunLocker {
387440
signal: redlock.RedlockAbortSignal,
388441
controller: AbortController
389442
): void {
390-
const automaticExtensionThreshold = 500; // Same as redlock default
391-
392-
if (automaticExtensionThreshold > duration - 100) {
443+
if (this.automaticExtensionThreshold > duration - 100) {
393444
// Don't set up auto-extension if duration is too short
394445
return;
395446
}
396447

397448
const scheduleExtension = (): void => {
398-
const timeUntilExtension = context.lock.expiration - Date.now() - automaticExtensionThreshold;
449+
const timeUntilExtension =
450+
context.lock.expiration - Date.now() - this.automaticExtensionThreshold;
399451

400452
if (timeUntilExtension > 0) {
401453
context.timeout = setTimeout(() => {
@@ -475,13 +527,47 @@ export class RunLocker {
475527
condition: boolean,
476528
name: string,
477529
resources: string[],
478-
duration: number,
530+
duration: number | undefined,
531+
routine: (signal?: redlock.RedlockAbortSignal) => Promise<T>
532+
): Promise<T>;
533+
async lockIf<T>(
534+
condition: boolean,
535+
name: string,
536+
resources: string[],
479537
routine: (signal?: redlock.RedlockAbortSignal) => Promise<T>
538+
): Promise<T>;
539+
async lockIf<T>(
540+
condition: boolean,
541+
name: string,
542+
resources: string[],
543+
durationOrRoutine: number | undefined | ((signal?: redlock.RedlockAbortSignal) => Promise<T>),
544+
routine?: (signal?: redlock.RedlockAbortSignal) => Promise<T>
480545
): Promise<T> {
481546
if (condition) {
482-
return this.lock(name, resources, duration, routine);
547+
// Handle overloaded parameters
548+
if (typeof durationOrRoutine === "function") {
549+
// Called as lockIf(condition, name, resources, routine) - use default duration
550+
return this.lock(
551+
name,
552+
resources,
553+
durationOrRoutine as (signal: redlock.RedlockAbortSignal) => Promise<T>
554+
);
555+
} else {
556+
// Called as lockIf(condition, name, resources, duration, routine) - use provided duration
557+
return this.lock(
558+
name,
559+
resources,
560+
durationOrRoutine,
561+
routine! as (signal: redlock.RedlockAbortSignal) => Promise<T>
562+
);
563+
}
483564
} else {
484-
return routine();
565+
// Handle overloaded parameters for non-lock case
566+
if (typeof durationOrRoutine === "function") {
567+
return durationOrRoutine();
568+
} else {
569+
return routine!();
570+
}
485571
}
486572
}
487573

@@ -493,10 +579,18 @@ export class RunLocker {
493579
return this.asyncLocalStorage.getStore()?.resources;
494580
}
495581

496-
getRetryConfig(): Readonly<Required<RetryConfig>> {
582+
getRetryConfig(): Readonly<Required<LockRetryConfig>> {
497583
return { ...this.retryConfig };
498584
}
499585

586+
getDefaultDuration(): number {
587+
return this.defaultDuration;
588+
}
589+
590+
getAutomaticExtensionThreshold(): number {
591+
return this.automaticExtensionThreshold;
592+
}
593+
500594
async quit() {
501595
// Clean up all active manual contexts
502596
for (const [lockId, context] of this.activeManualContexts) {

0 commit comments

Comments
 (0)