Skip to content

Commit 4ec0706

Browse files
committed
WIP runlock rewrite
1 parent 903dd43 commit 4ec0706

File tree

1 file changed

+188
-1
lines changed

1 file changed

+188
-1
lines changed

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 188 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,199 @@ export class RunLocker {
165165
);
166166
}
167167

168+
/** Manual lock acquisition with custom retry logic */
169+
async #acquireAndExecute<T>(
170+
name: string,
171+
resources: string[],
172+
duration: number,
173+
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>,
174+
lockId: string,
175+
lockStartTime: number
176+
): Promise<T> {
177+
const joinedResources = resources.sort().join(",");
178+
179+
// Custom retry settings
180+
const maxRetries = 10;
181+
const baseDelay = 200;
182+
const jitter = 200;
183+
184+
// Retry the lock acquisition specifically using tryCatch
185+
let lock: redlock.Lock;
186+
for (let attempt = 0; attempt <= maxRetries; attempt++) {
187+
const [error, acquiredLock] = await tryCatch(this.redlock.acquire(resources, duration));
188+
189+
if (!error && acquiredLock) {
190+
lock = acquiredLock;
191+
break;
192+
}
193+
194+
// If this is the last attempt, throw the error
195+
if (attempt === maxRetries) {
196+
throw error || new Error("Failed to acquire lock after maximum retries");
197+
}
198+
199+
// If it's a ResourceLockedError, we should retry
200+
if (error && error.name === "ResourceLockedError") {
201+
// Calculate delay with jitter
202+
const delay = baseDelay + Math.floor((Math.random() * 2 - 1) * jitter);
203+
await new Promise((resolve) => setTimeout(resolve, Math.max(0, delay)));
204+
continue;
205+
}
206+
207+
// For other errors, throw immediately
208+
throw error || new Error("Unknown error during lock acquisition");
209+
}
210+
211+
// Create an AbortController for our signal
212+
const controller = new AbortController();
213+
const signal = controller.signal as redlock.RedlockAbortSignal;
214+
215+
const manualContext: ManualLockContext = {
216+
lock: lock!,
217+
timeout: undefined,
218+
extension: undefined,
219+
};
220+
221+
// Set up auto-extension starting from when lock was actually acquired
222+
this.#setupAutoExtension(manualContext, duration, signal, controller);
223+
224+
try {
225+
const newContext: LockContext = {
226+
resources: joinedResources,
227+
signal,
228+
lockType: name,
229+
};
230+
231+
// Track active lock
232+
this.activeLocks.set(lockId, {
233+
lockType: name,
234+
resources: resources,
235+
});
236+
237+
let lockSuccess = true;
238+
try {
239+
const result = await this.asyncLocalStorage.run(newContext, async () => {
240+
return routine(signal);
241+
});
242+
243+
return result;
244+
} catch (lockError) {
245+
lockSuccess = false;
246+
throw lockError;
247+
} finally {
248+
// Record lock duration
249+
const lockDuration = performance.now() - lockStartTime;
250+
this.lockDurationHistogram.record(lockDuration, {
251+
[SemanticAttributes.LOCK_TYPE]: name,
252+
[SemanticAttributes.LOCK_SUCCESS]: lockSuccess.toString(),
253+
});
254+
255+
// Remove from active locks when done
256+
this.activeLocks.delete(lockId);
257+
}
258+
} finally {
259+
// Clean up extension mechanism - this ensures auto extension stops after routine finishes
260+
this.#cleanupExtension(manualContext);
261+
262+
// Release the lock using tryCatch
263+
const [releaseError] = await tryCatch(lock!.release());
264+
if (releaseError) {
265+
this.logger.warn("[RunLocker] Error releasing lock", {
266+
error: releaseError,
267+
resources,
268+
lockValue: lock!.value,
269+
});
270+
}
271+
}
272+
}
273+
274+
/** Set up automatic lock extension */
275+
#setupAutoExtension(
276+
context: ManualLockContext,
277+
duration: number,
278+
signal: redlock.RedlockAbortSignal,
279+
controller: AbortController
280+
): void {
281+
const automaticExtensionThreshold = 500; // Same as redlock default
282+
283+
if (automaticExtensionThreshold > duration - 100) {
284+
// Don't set up auto-extension if duration is too short
285+
return;
286+
}
287+
288+
const scheduleExtension = (): void => {
289+
const timeUntilExtension = context.lock.expiration - Date.now() - automaticExtensionThreshold;
290+
291+
if (timeUntilExtension > 0) {
292+
context.timeout = setTimeout(() => {
293+
context.extension = this.#extendLock(
294+
context,
295+
duration,
296+
signal,
297+
controller,
298+
scheduleExtension
299+
);
300+
}, timeUntilExtension);
301+
}
302+
};
303+
304+
scheduleExtension();
305+
}
306+
307+
/** Extend a lock */
308+
async #extendLock(
309+
context: ManualLockContext,
310+
duration: number,
311+
signal: redlock.RedlockAbortSignal,
312+
controller: AbortController,
313+
scheduleNext: () => void
314+
): Promise<void> {
315+
context.timeout = undefined;
316+
317+
const [error, newLock] = await tryCatch(context.lock.extend(duration));
318+
319+
if (!error && newLock) {
320+
context.lock = newLock;
321+
// Only schedule next extension if we haven't been cleaned up
322+
if (context.timeout !== null) {
323+
scheduleNext();
324+
}
325+
} else {
326+
if (context.lock.expiration > Date.now()) {
327+
// If lock hasn't expired yet, try again (but only if not cleaned up)
328+
if (context.timeout !== null) {
329+
return this.#extendLock(context, duration, signal, controller, scheduleNext);
330+
}
331+
} else {
332+
// Lock has expired, abort the signal
333+
signal.error = error instanceof Error ? error : new Error(String(error));
334+
controller.abort();
335+
}
336+
}
337+
}
338+
339+
/** Clean up extension mechanism */
340+
#cleanupExtension(context: ManualLockContext): void {
341+
// Signal that we're cleaning up by setting timeout to null
342+
if (context.timeout) {
343+
clearTimeout(context.timeout);
344+
}
345+
context.timeout = null;
346+
347+
// Wait for any in-flight extension to complete
348+
if (context.extension) {
349+
context.extension.catch(() => {
350+
// Ignore errors during cleanup
351+
});
352+
}
353+
}
354+
168355
async lockIf<T>(
169356
condition: boolean,
170357
name: string,
171358
resources: string[],
172359
duration: number,
173-
routine: () => Promise<T>
360+
routine: (signal?: redlock.RedlockAbortSignal) => Promise<T>
174361
): Promise<T> {
175362
if (condition) {
176363
return this.lock(name, resources, duration, routine);

0 commit comments

Comments
 (0)