Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions apps/sim/lib/core/config/redis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,37 @@ describe('redis config', () => {
expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true)
})

it('should drop the cached client so the next getRedisClient() builds a fresh one', async () => {
getRedisClient()
const callsBefore = MockRedisConstructor.mock.calls.length

mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)

expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true)

getRedisClient()
expect(MockRedisConstructor.mock.calls.length).toBe(callsBefore + 1)
})

it('should restart the PING health check against the new client', async () => {
getRedisClient()

mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)

expect(mockRedisInstance.disconnect).toHaveBeenCalledTimes(1)

getRedisClient()

await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)

expect(mockRedisInstance.disconnect).toHaveBeenCalledTimes(2)
})

it('should handle listener errors gracefully without breaking health check', async () => {
const badListener = vi.fn(() => {
throw new Error('listener crashed')
Expand Down
8 changes: 8 additions & 0 deletions apps/sim/lib/core/config/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ function startPingHealthCheck(redis: Redis): void {
consecutiveFailures: pingFailures,
})
pingFailures = 0
// Drop the cached client and stop this health check before disconnecting,
// so the next getRedisClient() builds a fresh client and a fresh PING loop.
// Listeners may call getRedisClient() and must observe the cleared global.
globalRedisClient = null
if (pingInterval) {
clearInterval(pingInterval)
pingInterval = null
}
for (const cb of reconnectListeners) {
try {
cb()
Expand Down
12 changes: 11 additions & 1 deletion apps/sim/lib/execution/isolated-vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,20 @@ async function releaseDistributedLease(ownerKey: string, leaseId: string): Promi
return 1
`

let deadlineTimer: NodeJS.Timeout | undefined
const deadline = new Promise<never>((_, reject) => {
deadlineTimer = setTimeout(
() => reject(new Error(`Redis lease release timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)),
LEASE_REDIS_DEADLINE_MS
)
})

try {
await redis.eval(script, 1, key, leaseId)
await Promise.race([redis.eval(script, 1, key, leaseId), deadline])
} catch (error) {
logger.error('Failed to release distributed owner lease', { ownerKey, error })
} finally {
clearTimeout(deadlineTimer)
}
}

Expand Down
Loading