@@ -71,15 +71,17 @@ export interface RecordUsageParams {
7171}
7272
7373/**
74- * Records usage in a single atomic transaction .
74+ * Records usage by inserting into usage_log and incrementing userStats counters .
7575 *
76- * Inserts all entries into usage_log and updates userStats counters
77- * (totalCost, currentPeriodCost, lastActive) within one Postgres transaction.
78- * The total cost added to userStats is derived from summing entry costs,
79- * ensuring usage_log and currentPeriodCost can never drift apart.
76+ * The two writes are intentionally not wrapped in a transaction: under high
77+ * concurrency for the same userId, holding BEGIN/COMMIT across the user_stats
78+ * row-lock wait pins pgbouncer connections and exhausts the pool.
8079 *
81- * If billing is disabled, total cost is zero, or no entries have positive cost,
82- * this function returns early without writing anything.
80+ * usage_log is the source of truth and the INSERT propagates errors to the
81+ * caller. The userStats UPDATE is best-effort: failures (and missing-row
82+ * cases) are logged as warnings and swallowed. Counter drift is acceptable
83+ * here — the long-term plan is to derive counters from usage_log directly.
84+ * Any drift warning in logs is a signal that needs investigation.
8385 */
8486export async function recordUsage ( params : RecordUsageParams ) : Promise < void > {
8587 if ( ! isBillingEnabled ) {
@@ -103,47 +105,56 @@ export async function recordUsage(params: RecordUsageParams): Promise<void> {
103105 ? Object . fromEntries ( Object . entries ( additionalStats ) . filter ( ( [ k ] ) => ! RESERVED_KEYS . has ( k ) ) )
104106 : undefined
105107
106- await db . transaction ( async ( tx ) => {
107- if ( validEntries . length > 0 ) {
108- await tx . insert ( usageLog ) . values (
109- validEntries . map ( ( entry ) => ( {
110- id : generateId ( ) ,
111- userId,
112- category : entry . category ,
113- source : entry . source ,
114- description : entry . description ,
115- metadata : entry . metadata ?? null ,
116- cost : entry . cost . toString ( ) ,
117- workspaceId : workspaceId ?? null ,
118- workflowId : workflowId ?? null ,
119- executionId : executionId ?? null ,
120- } ) )
121- )
122- }
108+ if ( validEntries . length > 0 ) {
109+ await db . insert ( usageLog ) . values (
110+ validEntries . map ( ( entry ) => ( {
111+ id : generateId ( ) ,
112+ userId,
113+ category : entry . category ,
114+ source : entry . source ,
115+ description : entry . description ,
116+ metadata : entry . metadata ?? null ,
117+ cost : entry . cost . toString ( ) ,
118+ workspaceId : workspaceId ?? null ,
119+ workflowId : workflowId ?? null ,
120+ executionId : executionId ?? null ,
121+ } ) )
122+ )
123+ }
123124
124- const updateFields : Record < string , SQL | Date > = {
125- lastActive : new Date ( ) ,
126- ...( totalCost > 0 && {
127- totalCost : sql `total_cost + ${ totalCost } ` ,
128- currentPeriodCost : sql `current_period_cost + ${ totalCost } ` ,
129- } ) ,
130- ...safeStats ,
131- }
125+ const updateFields : Record < string , SQL | Date > = {
126+ lastActive : new Date ( ) ,
127+ ...( totalCost > 0 && {
128+ totalCost : sql `total_cost + ${ totalCost } ` ,
129+ currentPeriodCost : sql `current_period_cost + ${ totalCost } ` ,
130+ } ) ,
131+ ...safeStats ,
132+ }
132133
133- const result = await tx
134+ try {
135+ const result = await db
134136 . update ( userStats )
135137 . set ( updateFields )
136138 . where ( eq ( userStats . userId , userId ) )
137139 . returning ( { userId : userStats . userId } )
138140
139141 if ( result . length === 0 ) {
140- logger . warn ( 'recordUsage: userStats row not found, transaction will roll back ' , {
142+ logger . warn ( 'recordUsage: userStats row not found; counter increment dropped ' , {
141143 userId,
142144 totalCost,
145+ hadEntries : validEntries . length > 0 ,
146+ additionalStatsKeys : safeStats ? Object . keys ( safeStats ) : [ ] ,
143147 } )
144- throw new Error ( `userStats row not found for userId: ${ userId } ` )
145148 }
146- } )
149+ } catch ( error ) {
150+ logger . warn ( 'recordUsage: userStats update failed; counter increment dropped' , {
151+ error : toError ( error ) . message ,
152+ userId,
153+ totalCost,
154+ hadEntries : validEntries . length > 0 ,
155+ additionalStatsKeys : safeStats ? Object . keys ( safeStats ) : [ ] ,
156+ } )
157+ }
147158
148159 logger . debug ( 'Recorded usage' , {
149160 userId,
0 commit comments