Skip to content

Commit 13f7eff

Browse files
waleedlatif1claude
andcommitted
fix(data-drains): address PR review thread fixes
- service: throw on cancellation after pages loop so a run aborted mid-stream isn't recorded as success - audit-logs: include org-scoped rows (workspace_id IS NULL with metadata->>organizationId match) alongside workspace rows - access: require owner/admin for read routes too; drain configs leak bucket names and webhook URLs Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent e7e2954 commit 13f7eff

3 files changed

Lines changed: 34 additions & 20 deletions

File tree

apps/sim/lib/data-drains/access.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ export type DrainAccessResult =
3030

3131
/**
3232
* Centralizes the auth + membership + role + enterprise-plan gates that all
33-
* data-drain routes share. `requireMutating` flag enforces owner/admin role
34-
* and enterprise plan; read-only callers can pass `false` to allow any
35-
* organization member.
33+
* data-drain routes share. Owner/admin role is required for both reads and
34+
* writes — drain configs expose customer-controlled bucket names and webhook
35+
* URLs, which a regular member shouldn't be able to enumerate.
3636
*/
3737
export async function authorizeDrainAccess(
3838
organizationId: string,
@@ -84,15 +84,17 @@ export async function authorizeDrainAccess(
8484
}
8585
}
8686
}
87-
if (options.requireMutating) {
88-
if (memberEntry.role !== 'owner' && memberEntry.role !== 'admin') {
89-
return {
90-
ok: false,
91-
response: NextResponse.json(
92-
{ error: 'Forbidden - Only organization owners and admins can manage data drains' },
93-
{ status: 403 }
94-
),
95-
}
87+
if (memberEntry.role !== 'owner' && memberEntry.role !== 'admin') {
88+
return {
89+
ok: false,
90+
response: NextResponse.json(
91+
{
92+
error: options.requireMutating
93+
? 'Forbidden - Only organization owners and admins can manage data drains'
94+
: 'Forbidden - Only organization owners and admins can view data drains',
95+
},
96+
{ status: 403 }
97+
),
9698
}
9799
}
98100

apps/sim/lib/data-drains/service.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ export async function runDrain(
113113
sequence++
114114
}
115115

116+
if (signal.aborted) {
117+
throw new Error('Data drain run cancelled')
118+
}
119+
116120
const finishedAt = new Date()
117121
await db.transaction(async (tx) => {
118122
await tx

apps/sim/lib/data-drains/sources/audit-logs.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { db } from '@sim/db'
22
import { auditLog } from '@sim/db/schema'
3-
import { and, asc, inArray } from 'drizzle-orm'
3+
import { and, asc, inArray, isNull, or, sql } from 'drizzle-orm'
44
import {
55
decodeTimeCursor,
66
encodeTimeCursor,
@@ -12,15 +12,23 @@ import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/typ
1212
type AuditLogRow = typeof auditLog.$inferSelect
1313

1414
/**
15-
* Drains workspace-scoped audit events for every workspace owned by the
16-
* organization. Org-scoped events (rows with `workspace_id IS NULL`, e.g.
17-
* org-level invitations) are intentionally out of scope — `audit_log` has no
18-
* `organization_id` column to filter on, so cleanly attributing them is not
19-
* possible without a schema change. Track separately if needed.
15+
* Drains audit events scoped to the organization: rows from any of the org's
16+
* workspaces, plus org-level rows (`workspace_id IS NULL`) where
17+
* `metadata->>'organizationId'` matches. Audit-log writers consistently set
18+
* `metadata.organizationId` for org-scoped actions even though the table has
19+
* no dedicated FK column.
2020
*/
2121
async function* pages(input: SourcePageInput): AsyncIterable<AuditLogRow[]> {
2222
const workspaceIds = await getOrganizationWorkspaceIds(input.organizationId)
23-
if (workspaceIds.length === 0) return
23+
24+
const orgScopedClause = and(
25+
isNull(auditLog.workspaceId),
26+
sql`${auditLog.metadata}->>'organizationId' = ${input.organizationId}`
27+
)
28+
const scopeClause =
29+
workspaceIds.length === 0
30+
? orgScopedClause
31+
: or(inArray(auditLog.workspaceId, workspaceIds), orgScopedClause)
2432

2533
let cursor = decodeTimeCursor(input.cursor)
2634
while (!input.signal.aborted) {
@@ -29,7 +37,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<AuditLogRow[]> {
2937
const rows = await db
3038
.select()
3139
.from(auditLog)
32-
.where(and(inArray(auditLog.workspaceId, workspaceIds), cursorClause))
40+
.where(and(scopeClause, cursorClause))
3341
.orderBy(asc(auditLog.createdAt), asc(auditLog.id))
3442
.limit(input.chunkSize)
3543

0 commit comments

Comments
 (0)