From 331d735a73886a0731afb4348f6e7936eb6c7cf8 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 7 May 2026 15:32:37 -0700 Subject: [PATCH] fix(redis): drop cached client and restart PING loop after forced reconnect --- apps/sim/lib/core/config/redis.test.ts | 31 ++++++++++++++++++++++++++ apps/sim/lib/core/config/redis.ts | 8 +++++++ apps/sim/lib/execution/isolated-vm.ts | 12 +++++++++- 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/core/config/redis.test.ts b/apps/sim/lib/core/config/redis.test.ts index b41ddb3da5b..85ccc6c2e1a 100644 --- a/apps/sim/lib/core/config/redis.test.ts +++ b/apps/sim/lib/core/config/redis.test.ts @@ -90,6 +90,37 @@ describe('redis config', () => { expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true) }) + it('should drop the cached client so the next getRedisClient() builds a fresh one', async () => { + getRedisClient() + const callsBefore = MockRedisConstructor.mock.calls.length + + mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT')) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) + + expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true) + + getRedisClient() + expect(MockRedisConstructor.mock.calls.length).toBe(callsBefore + 1) + }) + + it('should restart the PING health check against the new client', async () => { + getRedisClient() + + mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT')) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) + + expect(mockRedisInstance.disconnect).toHaveBeenCalledTimes(1) + + getRedisClient() + + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) + + expect(mockRedisInstance.disconnect).toHaveBeenCalledTimes(2) + }) + it('should handle listener errors gracefully without breaking health check', async () => { const badListener = vi.fn(() => { throw new Error('listener crashed') diff --git a/apps/sim/lib/core/config/redis.ts b/apps/sim/lib/core/config/redis.ts index 7e60fbe8e81..c6e904ec7b3 100644 --- a/apps/sim/lib/core/config/redis.ts +++ b/apps/sim/lib/core/config/redis.ts @@ -47,6 +47,14 @@ function startPingHealthCheck(redis: Redis): void { consecutiveFailures: pingFailures, }) pingFailures = 0 + // Drop the cached client and stop this health check before disconnecting, + // so the next getRedisClient() builds a fresh client and a fresh PING loop. + // Listeners may call getRedisClient() and must observe the cleared global. + globalRedisClient = null + if (pingInterval) { + clearInterval(pingInterval) + pingInterval = null + } for (const cb of reconnectListeners) { try { cb() diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 7ada4f158fe..b35687d437a 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -425,10 +425,20 @@ async function releaseDistributedLease(ownerKey: string, leaseId: string): Promi return 1 ` + let deadlineTimer: NodeJS.Timeout | undefined + const deadline = new Promise((_, reject) => { + deadlineTimer = setTimeout( + () => reject(new Error(`Redis lease release timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)), + LEASE_REDIS_DEADLINE_MS + ) + }) + try { - await redis.eval(script, 1, key, leaseId) + await Promise.race([redis.eval(script, 1, key, leaseId), deadline]) } catch (error) { logger.error('Failed to release distributed owner lease', { ownerKey, error }) + } finally { + clearTimeout(deadlineTimer) } }