Skip to content

Commit a056306

Browse files
committed
Better retries and more tests
1 parent dfdc94e commit a056306

File tree

2 files changed

+824
-29
lines changed

2 files changed

+824
-29
lines changed

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 161 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,23 @@ const SemanticAttributes = {
2222
LOCK_SUCCESS: "run_engine.lock.success",
2323
};
2424

25+
export class LockAcquisitionTimeoutError extends Error {
26+
constructor(
27+
public readonly resources: string[],
28+
public readonly totalWaitTime: number,
29+
public readonly attempts: number,
30+
message?: string
31+
) {
32+
super(
33+
message ||
34+
`Failed to acquire lock on resources [${resources.join(
35+
", "
36+
)}] after ${totalWaitTime}ms and ${attempts} attempts`
37+
);
38+
this.name = "LockAcquisitionTimeoutError";
39+
}
40+
}
41+
2542
interface LockContext {
2643
resources: string;
2744
signal: redlock.RedlockAbortSignal;
@@ -34,6 +51,21 @@ interface ManualLockContext {
3451
extension: Promise<void> | undefined;
3552
}
3653

54+
interface RetryConfig {
55+
/** Maximum number of retry attempts (default: 10) */
56+
maxRetries?: number;
57+
/** Initial delay in milliseconds (default: 200) */
58+
baseDelay?: number;
59+
/** Maximum delay cap in milliseconds (default: 5000) */
60+
maxDelay?: number;
61+
/** Exponential backoff multiplier (default: 1.5) */
62+
backoffMultiplier?: number;
63+
/** Jitter factor as percentage (default: 0.1 for 10%) */
64+
jitterFactor?: number;
65+
/** Maximum total wait time in milliseconds (default: 30000) */
66+
maxTotalWaitTime?: number;
67+
}
68+
3769
export class RunLocker {
3870
private redlock: InstanceType<typeof redlock.default>;
3971
private asyncLocalStorage: AsyncLocalStorage<LockContext>;
@@ -43,8 +75,15 @@ export class RunLocker {
4375
private activeLocks: Map<string, { lockType: string; resources: string[] }> = new Map();
4476
private activeManualContexts: Map<string, ManualLockContext> = new Map();
4577
private lockDurationHistogram: Histogram;
46-
47-
constructor(options: { redis: Redis; logger: Logger; tracer: Tracer; meter?: Meter }) {
78+
private retryConfig: Required<RetryConfig>;
79+
80+
constructor(options: {
81+
redis: Redis;
82+
logger: Logger;
83+
tracer: Tracer;
84+
meter?: Meter;
85+
retryConfig?: RetryConfig;
86+
}) {
4887
this.redlock = new Redlock([options.redis], {
4988
driftFactor: 0.01,
5089
retryCount: 10,
@@ -57,6 +96,16 @@ export class RunLocker {
5796
this.tracer = options.tracer;
5897
this.meter = options.meter ?? getMeter("run-engine");
5998

99+
// Initialize retry configuration with defaults
100+
this.retryConfig = {
101+
maxRetries: options.retryConfig?.maxRetries ?? 10,
102+
baseDelay: options.retryConfig?.baseDelay ?? 200,
103+
maxDelay: options.retryConfig?.maxDelay ?? 5000,
104+
backoffMultiplier: options.retryConfig?.backoffMultiplier ?? 1.5,
105+
jitterFactor: options.retryConfig?.jitterFactor ?? 0.1,
106+
maxTotalWaitTime: options.retryConfig?.maxTotalWaitTime ?? 30000,
107+
};
108+
60109
const activeLocksObservableGauge = this.meter.createObservableGauge("run_engine.locks.active", {
61110
description: "The number of active locks by type",
62111
unit: "locks",
@@ -140,7 +189,7 @@ export class RunLocker {
140189
);
141190
}
142191

143-
/** Manual lock acquisition with custom retry logic */
192+
/** Manual lock acquisition with exponential backoff retry logic */
144193
async #acquireAndExecute<T>(
145194
name: string,
146195
resources: string[],
@@ -151,36 +200,115 @@ export class RunLocker {
151200
): Promise<T> {
152201
const joinedResources = resources.sort().join(",");
153202

154-
// Custom retry settings
155-
const maxRetries = 10;
156-
const baseDelay = 200;
157-
const jitter = 200;
203+
// Use configured retry settings with exponential backoff
204+
const { maxRetries, baseDelay, maxDelay, backoffMultiplier, jitterFactor, maxTotalWaitTime } =
205+
this.retryConfig;
158206

159-
// Retry the lock acquisition specifically using tryCatch
207+
// Track timing for total wait time limit
208+
const retryStartTime = performance.now();
209+
let totalWaitTime = 0;
210+
211+
// Retry the lock acquisition with exponential backoff
160212
let lock: redlock.Lock;
213+
let lastError: Error | undefined;
214+
161215
for (let attempt = 0; attempt <= maxRetries; attempt++) {
162216
const [error, acquiredLock] = await tryCatch(this.redlock.acquire(resources, duration));
163217

164218
if (!error && acquiredLock) {
165219
lock = acquiredLock;
220+
if (attempt > 0) {
221+
this.logger.debug("[RunLocker] Lock acquired after retries", {
222+
name,
223+
resources,
224+
attempts: attempt + 1,
225+
totalWaitTime: Math.round(totalWaitTime),
226+
});
227+
}
166228
break;
167229
}
168230

169-
// If this is the last attempt, throw the error
231+
lastError = error instanceof Error ? error : new Error(String(error));
232+
233+
// Check if we've exceeded total wait time limit
234+
if (totalWaitTime >= maxTotalWaitTime) {
235+
this.logger.warn("[RunLocker] Lock acquisition exceeded total wait time limit", {
236+
name,
237+
resources,
238+
attempts: attempt + 1,
239+
totalWaitTime: Math.round(totalWaitTime),
240+
maxTotalWaitTime,
241+
});
242+
throw new LockAcquisitionTimeoutError(
243+
resources,
244+
Math.round(totalWaitTime),
245+
attempt + 1,
246+
`Lock acquisition on resources [${resources.join(
247+
", "
248+
)}] exceeded total wait time limit of ${maxTotalWaitTime}ms`
249+
);
250+
}
251+
252+
// If this is the last attempt, throw timeout error
170253
if (attempt === maxRetries) {
171-
throw error || new Error("Failed to acquire lock after maximum retries");
254+
this.logger.warn("[RunLocker] Lock acquisition exhausted all retries", {
255+
name,
256+
resources,
257+
attempts: attempt + 1,
258+
totalWaitTime: Math.round(totalWaitTime),
259+
lastError: lastError.message,
260+
});
261+
throw new LockAcquisitionTimeoutError(
262+
resources,
263+
Math.round(totalWaitTime),
264+
attempt + 1,
265+
`Lock acquisition on resources [${resources.join(", ")}] failed after ${
266+
attempt + 1
267+
} attempts`
268+
);
172269
}
173270

174-
// If it's a ResourceLockedError, we should retry
175-
if (error && error.name === "ResourceLockedError") {
176-
// Calculate delay with jitter
177-
const delay = baseDelay + Math.floor((Math.random() * 2 - 1) * jitter);
178-
await new Promise((resolve) => setTimeout(resolve, Math.max(0, delay)));
271+
// Check if it's a retryable error (lock contention)
272+
// ExecutionError: General redlock failure (including lock contention)
273+
// ResourceLockedError: Specific lock contention error (if thrown)
274+
const isRetryableError =
275+
error && (error.name === "ResourceLockedError" || error.name === "ExecutionError");
276+
277+
if (isRetryableError) {
278+
// Calculate exponential backoff delay with jitter and cap
279+
const exponentialDelay = Math.min(
280+
baseDelay * Math.pow(backoffMultiplier, attempt),
281+
maxDelay
282+
);
283+
const jitter = exponentialDelay * jitterFactor * (Math.random() * 2 - 1); // ±jitterFactor% jitter
284+
const delay = Math.max(0, Math.round(exponentialDelay + jitter));
285+
286+
// Update total wait time before delay
287+
totalWaitTime += delay;
288+
289+
this.logger.debug("[RunLocker] Lock acquisition failed, retrying with backoff", {
290+
name,
291+
resources,
292+
attempt: attempt + 1,
293+
delay,
294+
totalWaitTime: Math.round(totalWaitTime),
295+
error: error.message,
296+
errorName: error.name,
297+
});
298+
299+
await new Promise((resolve) => setTimeout(resolve, delay));
179300
continue;
180301
}
181302

182-
// For other errors, throw immediately
183-
throw error || new Error("Unknown error during lock acquisition");
303+
// For other errors (non-retryable), throw immediately
304+
this.logger.error("[RunLocker] Lock acquisition failed with non-retryable error", {
305+
name,
306+
resources,
307+
attempt: attempt + 1,
308+
error: lastError.message,
309+
errorName: lastError.name,
310+
});
311+
throw lastError;
184312
}
185313

186314
// Create an AbortController for our signal
@@ -305,9 +433,19 @@ export class RunLocker {
305433
}
306434
} else {
307435
if (context.lock.expiration > Date.now()) {
308-
// If lock hasn't expired yet, try again (but only if not cleaned up)
436+
// If lock hasn't expired yet, schedule a retry instead of recursing
437+
// This prevents stack overflow from repeated extension failures
309438
if (context.timeout !== null) {
310-
return this.#extendLock(context, duration, signal, controller, scheduleNext);
439+
const retryDelay = 100; // Short delay before retry
440+
context.timeout = setTimeout(() => {
441+
context.extension = this.#extendLock(
442+
context,
443+
duration,
444+
signal,
445+
controller,
446+
scheduleNext
447+
);
448+
}, retryDelay);
311449
}
312450
} else {
313451
// Lock has expired, abort the signal
@@ -355,6 +493,10 @@ export class RunLocker {
355493
return this.asyncLocalStorage.getStore()?.resources;
356494
}
357495

496+
getRetryConfig(): Readonly<Required<RetryConfig>> {
497+
return { ...this.retryConfig };
498+
}
499+
358500
async quit() {
359501
// Clean up all active manual contexts
360502
for (const [lockId, context] of this.activeManualContexts) {

0 commit comments

Comments
 (0)