Skip to content

Commit 7100fe0

Browse files
waleedlatif1claude
andcommitted
fix(data-drains): handle concurrent delete and DB outage in failure path
Two robustness fixes: - PUT /data-drains/[drainId]: return 404 if returning() yields no row, i.e. a concurrent DELETE landed between loadDrain and the UPDATE. - runDrain catch block: wrap the failed-status transaction so a DB outage during the status write doesn't mask the original delivery error. The reaper will eventually rewrite the row to failed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent feb6827 commit 7100fe0

2 files changed

Lines changed: 35 additions & 18 deletions

File tree

apps/sim/app/api/organizations/[id]/data-drains/[drainId]/route.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ export const PUT = withRouteHandler(async (request: NextRequest, context: RouteC
105105
.where(eq(dataDrains.id, drainId))
106106
.returning()
107107

108+
if (!updated) {
109+
// Concurrent DELETE landed between loadDrain() and this UPDATE.
110+
return NextResponse.json({ error: 'Data drain not found' }, { status: 404 })
111+
}
112+
108113
logger.info('Data drain updated', { drainId, organizationId })
109114

110115
recordAudit({

apps/sim/lib/data-drains/service.ts

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -160,24 +160,36 @@ export async function runDrain(
160160
} catch (error) {
161161
const finishedAt = new Date()
162162
const message = toError(error).message
163-
await db.transaction(async (tx) => {
164-
await tx
165-
.update(dataDrains)
166-
.set({ lastRunAt: finishedAt, updatedAt: finishedAt })
167-
.where(eq(dataDrains.id, drainId))
168-
await tx
169-
.update(dataDrainRuns)
170-
.set({
171-
status: 'failed',
172-
finishedAt,
173-
rowsExported,
174-
bytesWritten,
175-
cursorAfter: cursorBefore, // cursor not advanced on failure
176-
locators,
177-
error: message.slice(0, 4000),
178-
})
179-
.where(eq(dataDrainRuns.id, runId))
180-
})
163+
try {
164+
await db.transaction(async (tx) => {
165+
await tx
166+
.update(dataDrains)
167+
.set({ lastRunAt: finishedAt, updatedAt: finishedAt })
168+
.where(eq(dataDrains.id, drainId))
169+
await tx
170+
.update(dataDrainRuns)
171+
.set({
172+
status: 'failed',
173+
finishedAt,
174+
rowsExported,
175+
bytesWritten,
176+
cursorAfter: cursorBefore, // cursor not advanced on failure
177+
locators,
178+
error: message.slice(0, 4000),
179+
})
180+
.where(eq(dataDrainRuns.id, runId))
181+
})
182+
} catch (statusError) {
183+
// Don't let a failed status-update mask the real delivery error —
184+
// the reaper will eventually rewrite the row to `failed`. Log so we
185+
// can spot DB outages that would otherwise hide behind delivery errors.
186+
logger.error('Failed to record data drain failure status', {
187+
drainId,
188+
runId,
189+
deliveryError: message,
190+
statusError: toError(statusError).message,
191+
})
192+
}
181193

182194
logger.error('Data drain run failed', {
183195
drainId,

0 commit comments

Comments
 (0)