|
1 | 1 | import { db } from '@sim/db' |
2 | 2 | import { dataDrainRuns, dataDrains } from '@sim/db/schema' |
3 | 3 | import { createLogger } from '@sim/logger' |
| 4 | +import { toError } from '@sim/utils/errors' |
4 | 5 | import { and, eq, isNull, lt, or } from 'drizzle-orm' |
5 | 6 | import { isOrganizationOnEnterprisePlan } from '@/lib/billing/core/subscription' |
6 | 7 | import { getJobQueue } from '@/lib/core/async-jobs' |
@@ -148,11 +149,22 @@ export async function dispatchDueDrains(now: Date = new Date()): Promise<{ |
148 | 149 | } catch (error) { |
149 | 150 | // Roll back the claim — otherwise a transient queue outage silently |
150 | 151 | // delays this drain by a full cadence. We only revert if no other |
151 | | - // process has advanced lastRunAt past our claim timestamp. |
152 | | - await db |
153 | | - .update(dataDrains) |
154 | | - .set({ lastRunAt: candidate.lastRunAt, updatedAt: now }) |
155 | | - .where(and(eq(dataDrains.id, candidate.id), eq(dataDrains.lastRunAt, now))) |
| 152 | + // process has advanced lastRunAt past our claim timestamp. Guard the |
| 153 | + // rollback itself so a transient DB error here doesn't abort the |
| 154 | + // remaining candidates in the batch. |
| 155 | + try { |
| 156 | + await db |
| 157 | + .update(dataDrains) |
| 158 | + .set({ lastRunAt: candidate.lastRunAt, updatedAt: now }) |
| 159 | + .where(and(eq(dataDrains.id, candidate.id), eq(dataDrains.lastRunAt, now))) |
| 160 | + } catch (rollbackError) { |
| 161 | + logger.error('Failed to roll back data-drain claim after enqueue failure', { |
| 162 | + drainId: candidate.id, |
| 163 | + enqueueError: toError(error).message, |
| 164 | + rollbackError: toError(rollbackError).message, |
| 165 | + }) |
| 166 | + continue |
| 167 | + } |
156 | 168 | logger.error('Failed to enqueue data-drain job; rolled back claim', { |
157 | 169 | drainId: candidate.id, |
158 | 170 | error, |
|
0 commit comments