From 8e5305b07cdba37ce76656d0f2ea8b9d4070763f Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 7 May 2026 09:59:19 -0700 Subject: [PATCH 1/2] fix(billing): drop transaction wrapper in recordUsage to relieve pool contention --- apps/sim/lib/billing/core/usage-log.ts | 77 +++++++++++++------------- 1 file changed, 40 insertions(+), 37 deletions(-) diff --git a/apps/sim/lib/billing/core/usage-log.ts b/apps/sim/lib/billing/core/usage-log.ts index a2400d28a85..c75d65dc3a9 100644 --- a/apps/sim/lib/billing/core/usage-log.ts +++ b/apps/sim/lib/billing/core/usage-log.ts @@ -71,15 +71,13 @@ export interface RecordUsageParams { } /** - * Records usage in a single atomic transaction. + * Records usage by inserting into usage_log and incrementing userStats counters. * - * Inserts all entries into usage_log and updates userStats counters - * (totalCost, currentPeriodCost, lastActive) within one Postgres transaction. - * The total cost added to userStats is derived from summing entry costs, - * ensuring usage_log and currentPeriodCost can never drift apart. - * - * If billing is disabled, total cost is zero, or no entries have positive cost, - * this function returns early without writing anything. + * The two writes are intentionally not wrapped in a transaction: under high + * concurrency for the same userId, holding BEGIN/COMMIT across the user_stats + * row-lock wait pins pgbouncer connections and exhausts the pool. usage_log + * is the source of truth; if the userStats UPDATE fails the counter drifts + * and must be reconciled from usage_log out-of-band. */ export async function recordUsage(params: RecordUsageParams): Promise { if (!isBillingEnabled) { @@ -103,47 +101,52 @@ export async function recordUsage(params: RecordUsageParams): Promise { ? Object.fromEntries(Object.entries(additionalStats).filter(([k]) => !RESERVED_KEYS.has(k))) : undefined - await db.transaction(async (tx) => { - if (validEntries.length > 0) { - await tx.insert(usageLog).values( - validEntries.map((entry) => ({ - id: generateId(), - userId, - category: entry.category, - source: entry.source, - description: entry.description, - metadata: entry.metadata ?? null, - cost: entry.cost.toString(), - workspaceId: workspaceId ?? null, - workflowId: workflowId ?? null, - executionId: executionId ?? null, - })) - ) - } + if (validEntries.length > 0) { + await db.insert(usageLog).values( + validEntries.map((entry) => ({ + id: generateId(), + userId, + category: entry.category, + source: entry.source, + description: entry.description, + metadata: entry.metadata ?? null, + cost: entry.cost.toString(), + workspaceId: workspaceId ?? null, + workflowId: workflowId ?? null, + executionId: executionId ?? null, + })) + ) + } - const updateFields: Record = { - lastActive: new Date(), - ...(totalCost > 0 && { - totalCost: sql`total_cost + ${totalCost}`, - currentPeriodCost: sql`current_period_cost + ${totalCost}`, - }), - ...safeStats, - } + const updateFields: Record = { + lastActive: new Date(), + ...(totalCost > 0 && { + totalCost: sql`total_cost + ${totalCost}`, + currentPeriodCost: sql`current_period_cost + ${totalCost}`, + }), + ...safeStats, + } - const result = await tx + try { + const result = await db .update(userStats) .set(updateFields) .where(eq(userStats.userId, userId)) .returning({ userId: userStats.userId }) if (result.length === 0) { - logger.warn('recordUsage: userStats row not found, transaction will roll back', { + logger.warn('recordUsage: userStats row not found; counter will drift from usage_log', { userId, totalCost, }) - throw new Error(`userStats row not found for userId: ${userId}`) } - }) + } catch (error) { + logger.error('recordUsage: userStats update failed; counter will drift from usage_log', { + error: toError(error).message, + userId, + totalCost, + }) + } logger.debug('Recorded usage', { userId, From 938b0a98ecf7757c3bf80b2c12e412de519137c0 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 7 May 2026 10:13:04 -0700 Subject: [PATCH 2/2] fix(billing): always warn on userStats drift in recordUsage --- apps/sim/lib/billing/core/usage-log.ts | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/apps/sim/lib/billing/core/usage-log.ts b/apps/sim/lib/billing/core/usage-log.ts index c75d65dc3a9..22fea211230 100644 --- a/apps/sim/lib/billing/core/usage-log.ts +++ b/apps/sim/lib/billing/core/usage-log.ts @@ -75,9 +75,13 @@ export interface RecordUsageParams { * * The two writes are intentionally not wrapped in a transaction: under high * concurrency for the same userId, holding BEGIN/COMMIT across the user_stats - * row-lock wait pins pgbouncer connections and exhausts the pool. usage_log - * is the source of truth; if the userStats UPDATE fails the counter drifts - * and must be reconciled from usage_log out-of-band. + * row-lock wait pins pgbouncer connections and exhausts the pool. + * + * usage_log is the source of truth and the INSERT propagates errors to the + * caller. The userStats UPDATE is best-effort: failures (and missing-row + * cases) are logged as warnings and swallowed. Counter drift is acceptable + * here — the long-term plan is to derive counters from usage_log directly. + * Any drift warning in logs is a signal that needs investigation. */ export async function recordUsage(params: RecordUsageParams): Promise { if (!isBillingEnabled) { @@ -135,16 +139,20 @@ export async function recordUsage(params: RecordUsageParams): Promise { .returning({ userId: userStats.userId }) if (result.length === 0) { - logger.warn('recordUsage: userStats row not found; counter will drift from usage_log', { + logger.warn('recordUsage: userStats row not found; counter increment dropped', { userId, totalCost, + hadEntries: validEntries.length > 0, + additionalStatsKeys: safeStats ? Object.keys(safeStats) : [], }) } } catch (error) { - logger.error('recordUsage: userStats update failed; counter will drift from usage_log', { + logger.warn('recordUsage: userStats update failed; counter increment dropped', { error: toError(error).message, userId, totalCost, + hadEntries: validEntries.length > 0, + additionalStatsKeys: safeStats ? Object.keys(safeStats) : [], }) }