From 5edaa42c5f2eb37daee6b9523499698e719ce369 Mon Sep 17 00:00:00 2001 From: Vaibhav Arora Date: Sat, 13 Jun 2026 00:07:04 +0530 Subject: [PATCH] fix(cursor): page bubble scans by requested range Co-authored-by: Cursor --- src/cursor-cache.ts | 12 +- src/providers/cursor.ts | 232 ++++++++---------- .../cursor-bubble-scan-window.test.ts | 160 ++++++++++++ 3 files changed, 271 insertions(+), 133 deletions(-) create mode 100644 tests/providers/cursor-bubble-scan-window.test.ts diff --git a/src/cursor-cache.ts b/src/cursor-cache.ts index 7d84d262..4ed26b65 100644 --- a/src/cursor-cache.ts +++ b/src/cursor-cache.ts @@ -5,12 +5,10 @@ import { randomBytes } from 'crypto' import type { ParsedProviderCall } from './providers/types.js' -// Bumped to 3 for the workspace-aware breakdown change: the cursor parser -// now derives `sessionId` from the bubble row key (the real composer id) -// rather than the empty `conversationId` JSON field, and the workspace -// router relies on those composer ids to bucket calls per project. -// Version 2 caches contain `sessionId: 'unknown'` for every call and would -// route everything to the orphan project, so we invalidate them. +// Bumped to 4 for the paged Cursor bubble scan: version 3 caches could have +// been populated from the historic "newest 250k bubbles" cap and therefore +// may be missing older in-window calls even when the DB file itself has not +// changed. const CURSOR_CACHE_VERSION = 4 type ResultCache = { @@ -24,7 +22,7 @@ type ResultCache = { const CACHE_FILE = 'cursor-results.json' function getCacheDir(): string { - return join(homedir(), '.cache', 'codeburn') + return process.env['CODEBURN_CACHE_DIR'] ?? join(homedir(), '.cache', 'codeburn') } function getCachePath(): string { diff --git a/src/providers/cursor.ts b/src/providers/cursor.ts index 21cdce47..d7ffb28d 100644 --- a/src/providers/cursor.ts +++ b/src/providers/cursor.ts @@ -25,6 +25,10 @@ export function getCursorTimeFloor(dateRange?: DateRange): string { const CURSOR_COST_MODEL = 'claude-sonnet-4-5' +function verbose(): boolean { + return process.env['CODEBURN_VERBOSE'] === '1' +} + const modelDisplayNames: Record = { 'claude-4.5-opus-high-thinking': 'Opus 4.5 (Thinking)', 'claude-4-opus': 'Opus 4', @@ -43,6 +47,7 @@ const modelDisplayNames: Record = { } type BubbleRow = { + row_id: number bubble_key: string input_tokens: number | null output_tokens: number | null @@ -301,6 +306,7 @@ function modelForDisplay(raw: string | null): string { const BUBBLE_QUERY_BASE = ` SELECT + ROWID as row_id, key as bubble_key, json_extract(value, '$.tokenCount.inputTokens') as input_tokens, json_extract(value, '$.tokenCount.outputTokens') as output_tokens, @@ -328,29 +334,13 @@ const AGENTKV_QUERY = ` ORDER BY ROWID ASC ` -const USER_MESSAGES_QUERY = ` - SELECT - json_extract(value, '$.conversationId') as conversation_id, - json_extract(value, '$.createdAt') as created_at, - CAST(substr(json_extract(value, '$.text'), 1, 500) AS BLOB) as text - FROM cursorDiskKV - WHERE key LIKE 'bubbleId:%' - AND json_extract(value, '$.type') = 1 - AND (json_extract(value, '$.createdAt') > ? OR json_extract(value, '$.createdAt') IS NULL) - ORDER BY ROWID ASC -` - -// Split into HEAD (predicates we always emit) and TAIL (ORDER BY) so the -// caller can splice in an optional `ROWID >= ?` cutoff without rewriting -// the whole template. The original combined string is preserved as -// BUBBLE_QUERY_SINCE for any caller that doesn't want the cap. -const BUBBLE_QUERY_SINCE_HEAD = BUBBLE_QUERY_BASE + ` +const BUBBLE_QUERY_PAGE = BUBBLE_QUERY_BASE + ` AND json_extract(value, '$.createdAt') IS NOT NULL - AND json_extract(value, '$.createdAt') > ?` -const BUBBLE_QUERY_SINCE_TAIL = ` - ORDER BY ROWID ASC + AND ROWID < ? + ORDER BY ROWID DESC + LIMIT ? ` -const BUBBLE_QUERY_SINCE = BUBBLE_QUERY_SINCE_HEAD + BUBBLE_QUERY_SINCE_TAIL +const BUBBLE_QUERY_PAGE_SIZE = 10_000 function validateSchema(db: SqliteDatabase): boolean { try { @@ -363,8 +353,6 @@ function validateSchema(db: SqliteDatabase): boolean { } } -type UserMsgRow = { conversation_id: string; created_at: string; text: Uint8Array | string } - /// Per-conversation user-message buffer. We pop messages in arrival order via /// the `pos` cursor — a previous implementation called Array.shift() which is /// O(n) per call on large conversations and pinned multi-GB Cursor DBs at @@ -374,21 +362,21 @@ type UserMessageQueue = { pos: number } -function buildUserMessageMap(db: SqliteDatabase, timeFloor: string): Map { +function buildUserMessageMap(rows: BubbleRow[]): Map { const map = new Map() - try { - const rows = db.query(USER_MESSAGES_QUERY, [timeFloor]) - for (const row of rows) { - if (!row.conversation_id || !row.text) continue - const text = blobToText(row.text) - const existing = map.get(row.conversation_id) - if (existing) { - existing.messages.push(text) - } else { - map.set(row.conversation_id, { messages: [text], pos: 0 }) - } + + for (const row of rows) { + if (row.bubble_type !== 1 || !row.user_text) continue + const conversationId = parseComposerIdFromKey(row.bubble_key) + if (!conversationId) continue + const text = blobToText(row.user_text) + const existing = map.get(conversationId) + if (existing) { + existing.messages.push(text) + } else { + map.set(conversationId, { messages: [text], pos: 0 }) } - } catch {} + } return map } @@ -400,66 +388,36 @@ function takeUserMessage(queues: Map, conversationId: return msg } -function parseBubbles( - db: SqliteDatabase, - seenKeys: Set, - timeFloor: string, -): { calls: ParsedProviderCall[] } { +function parseBubbles(db: SqliteDatabase, seenKeys: Set, timeFloor: string): { calls: ParsedProviderCall[] } { const results: ParsedProviderCall[] = [] let skipped = 0 - // Hard cap on rows to scan. The BUBBLE_QUERY_SINCE filter relies on - // json_extract over the value BLOB, which SQLite cannot serve from an - // index — every row is JSON-decoded. Multi-GB Cursor DBs (power users, - // years of usage) regularly exceed 500k bubble rows and were producing - // 30s+ parse stalls. Compute a ROWID cutoff that limits the scan to the - // MAX_BUBBLES most-recent bubbles when the user is over the cap, and - // warn so they know older sessions may be missing. - const MAX_BUBBLES = 250_000 - let rowIdCutoff = 0 + const collected: BubbleRow[] = [] + let beforeRowId = Number.MAX_SAFE_INTEGER try { - const countRows = db.query<{ cnt: number }>( - "SELECT COUNT(*) as cnt FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'" - ) - const total = countRows[0]?.cnt ?? 0 - if (total > MAX_BUBBLES) { - // Find the ROWID of the (MAX_BUBBLES)th most-recent bubble. Anything - // below this rowid is older and gets skipped. Bubbles are written - // chronologically so ROWID order ≈ insertion order. - const cutoffRows = db.query<{ rid: number }>( - `SELECT MIN(rid) as rid FROM ( - SELECT ROWID as rid FROM cursorDiskKV - WHERE key LIKE 'bubbleId:%' - ORDER BY ROWID DESC - LIMIT ? - )`, - [MAX_BUBBLES] - ) - rowIdCutoff = cutoffRows[0]?.rid ?? 0 - process.stderr.write( - `codeburn: Cursor database has ${total.toLocaleString()} bubbles, ` + - `scanning the most recent ${MAX_BUBBLES.toLocaleString()}. ` + - `Older sessions may be missing from this report.\n` - ) + while (true) { + const rows = db.query(BUBBLE_QUERY_PAGE, [beforeRowId, BUBBLE_QUERY_PAGE_SIZE]) + if (rows.length === 0) break + beforeRowId = rows[rows.length - 1]!.row_id + + let allRowsOutsideWindow = true + for (const row of rows) { + const createdAt = row.created_at ?? '' + if (createdAt > timeFloor) { + collected.push(row) + allRowsOutsideWindow = false + } + } + if (allRowsOutsideWindow) break } - } catch { /* best-effort diagnostic */ } - - const userMessages = buildUserMessageMap(db, timeFloor) - - // Append the rowid cutoff when active. Empty string when not capped so the - // query string compares identically to the un-capped version on small DBs. - const rowIdFilter = rowIdCutoff > 0 ? ' AND ROWID >= ?' : '' - const params: unknown[] = rowIdCutoff > 0 ? [timeFloor, rowIdCutoff] : [timeFloor] - const cappedQuery = BUBBLE_QUERY_SINCE_HEAD + rowIdFilter + BUBBLE_QUERY_SINCE_TAIL - - let rows: BubbleRow[] - try { - rows = db.query(cappedQuery, params) } catch { return { calls: results } } - for (const row of rows) { + collected.sort((a, b) => a.row_id - b.row_id) + const userMessages = buildUserMessageMap(collected) + + for (const row of collected) { try { let inputTokens = row.input_tokens ?? 0 let outputTokens = row.output_tokens ?? 0 @@ -540,7 +498,7 @@ function parseBubbles( } } - if (skipped > 0) { + if (skipped > 0 && verbose()) { process.stderr.write(`codeburn: skipped ${skipped} unreadable Cursor entries\n`) } @@ -677,13 +635,62 @@ function parseAgentKv(db: SqliteDatabase, seenKeys: Set, dbPath: string) return { calls: results } } -function createParser( - source: SessionSource, - seenKeys: Set, - dateRange?: DateRange, -): SessionParser { - const timeFloor = getCursorTimeFloor(dateRange) +const parsedDbCache = new Map>() + +function parsedDbCacheKey(dbPath: string, timeFloor: string): string { + try { + const s = statSync(dbPath) + return `${dbPath}:${s.mtimeMs}:${s.size}:${timeFloor}` + } catch { + return `${dbPath}:missing:${timeFloor}` + } +} + +async function loadParsedCursorCalls(dbPath: string, timeFloor: string): Promise { + const cacheKey = parsedDbCacheKey(dbPath, timeFloor) + const existing = parsedDbCache.get(cacheKey) + if (existing) return existing + + const promise = (async () => { + const cached = await readCachedResults(dbPath, timeFloor) + if (cached) return cached + let db: SqliteDatabase + try { + db = openDatabase(dbPath) + } catch (err) { + process.stderr.write(`codeburn: cannot open Cursor database: ${err instanceof Error ? err.message : err}\n`) + return null + } + try { + if (!validateSchema(db)) { + process.stderr.write('codeburn: Cursor storage format not recognized. You may need to update CodeBurn.\n') + return null + } + // Use a fresh local Set for intra-parse dedup so the global + // seenKeys is not mutated by calls that the workspace filter is + // about to drop. Cross-source dedup happens at yield time. + const localSeen = new Set() + const { calls: bubbleCalls } = parseBubbles(db, localSeen, timeFloor) + const { calls: agentKvCalls } = parseAgentKv(db, localSeen, dbPath) + const allCalls = [...bubbleCalls, ...agentKvCalls] + await writeCachedResults(dbPath, allCalls, timeFloor) + return allCalls + } finally { + db.close() + } + })() + + parsedDbCache.set(cacheKey, promise) + try { + return await promise + } catch (err) { + parsedDbCache.delete(cacheKey) + throw err + } +} + +function createParser(source: SessionSource, seenKeys: Set, dateRange?: DateRange): SessionParser { return { async *parse(): AsyncGenerator { if (!isSqliteAvailable()) { @@ -718,38 +725,11 @@ function createParser( } } - // Cache is keyed on the bare DB path so multiple workspace-scoped - // sources reuse one parsed bubble set per CLI run. Filtering happens - // post-cache so each source emits only its own composers. - let allCalls: ParsedProviderCall[] | null = null - const cached = await readCachedResults(dbPath, timeFloor) - if (cached) { - allCalls = cached - } else { - let db: SqliteDatabase - try { - db = openDatabase(dbPath) - } catch (err) { - process.stderr.write(`codeburn: cannot open Cursor database: ${err instanceof Error ? err.message : err}\n`) - return - } - try { - if (!validateSchema(db)) { - process.stderr.write('codeburn: Cursor storage format not recognized. You may need to update CodeBurn.\n') - return - } - // Use a fresh local Set for intra-parse dedup so the global - // seenKeys is not mutated by calls that the workspace filter is - // about to drop. Cross-source dedup happens at yield time. - const localSeen = new Set() - const { calls: bubbleCalls } = parseBubbles(db, localSeen, timeFloor) - const { calls: agentKvCalls } = parseAgentKv(db, localSeen, dbPath) - allCalls = [...bubbleCalls, ...agentKvCalls] - await writeCachedResults(dbPath, allCalls, timeFloor) - } finally { - db.close() - } - } + // Cache is keyed on the bare DB path and requested lower bound so multiple + // workspace-scoped sources reuse one parsed bubble set per CLI run. + // Filtering happens post-cache so each source emits only its own composers. + const allCalls = await loadParsedCursorCalls(dbPath, getCursorTimeFloor(dateRange)) + if (!allCalls) return for (const call of allCalls) { if (composerFilter !== null) { diff --git a/tests/providers/cursor-bubble-scan-window.test.ts b/tests/providers/cursor-bubble-scan-window.test.ts new file mode 100644 index 00000000..1b26a09e --- /dev/null +++ b/tests/providers/cursor-bubble-scan-window.test.ts @@ -0,0 +1,160 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { mkdtemp, rm } from 'fs/promises' +import { join } from 'path' +import { tmpdir } from 'os' +import { createRequire } from 'node:module' + +import { createCursorProvider } from '../../src/providers/cursor.js' +import { isSqliteAvailable } from '../../src/sqlite.js' +import type { ParsedProviderCall } from '../../src/providers/types.js' +import type { DateRange } from '../../src/types.js' + +const requireForTest = createRequire(import.meta.url) +const skipUnlessSqlite = isSqliteAvailable() ? describe : describe.skip + +let tmpDir: string + +beforeEach(async () => { + vi.useFakeTimers() + vi.setSystemTime(new Date('2026-06-12T12:00:00.000Z')) + tmpDir = await mkdtemp(join(tmpdir(), 'cursor-scan-window-')) + process.env['CODEBURN_CACHE_DIR'] = join(tmpDir, 'cache') +}) + +afterEach(async () => { + vi.useRealTimers() + delete process.env['CODEBURN_CACHE_DIR'] + await rm(tmpDir, { recursive: true, force: true }) +}) + +function bubbleValue(opts: { + type: 1 | 2 + conversationId: string + createdAt: string + text: string + inputTokens: number + outputTokens: number +}): string { + return JSON.stringify({ + type: opts.type, + conversationId: opts.conversationId, + createdAt: opts.createdAt, + text: opts.text, + tokenCount: { + inputTokens: opts.inputTokens, + outputTokens: opts.outputTokens, + }, + modelInfo: { modelName: 'claude-4.6-sonnet' }, + codeBlocks: '[]', + }) +} + +function createDbWithOlderInWindowBubblePastHistoricCap(): string { + const dbPath = join(tmpDir, 'state.vscdb') + const { DatabaseSync: Database } = requireForTest('node:sqlite') + const db = new Database(dbPath) + db.exec('PRAGMA journal_mode = OFF') + db.exec('PRAGMA synchronous = OFF') + db.exec('CREATE TABLE cursorDiskKV (key TEXT PRIMARY KEY, value BLOB)') + db.exec('BEGIN') + + const insert = db.prepare('INSERT INTO cursorDiskKV (key, value) VALUES (?, ?)') + const olderComposerId = 'older-composer' + insert.run( + `bubbleId:${olderComposerId}:bubble-important`, + bubbleValue({ + type: 2, + conversationId: olderComposerId, + createdAt: '2026-02-12T12:00:00.000Z', + text: 'important older assistant response', + inputTokens: 100, + outputTokens: 50, + }), + ) + + const emptyNewerBubble = bubbleValue({ + type: 2, + conversationId: 'empty-newer', + createdAt: '2026-06-11T12:00:00.000Z', + text: '', + inputTokens: 0, + outputTokens: 0, + }) + const manyNewerEmptyBubbles = db.prepare('INSERT INTO cursorDiskKV (key, value) VALUES (?, ?)') + for (let i = 0; i < 250_001; i += 1) { + manyNewerEmptyBubbles.run(`bubbleId:empty-newer:bubble-${i}`, emptyNewerBubble) + } + + db.exec('COMMIT') + db.close() + return dbPath +} + +function createDbWithOldAndRequestedRangeBubbles(): string { + const dbPath = join(tmpDir, 'state.vscdb') + const { DatabaseSync: Database } = requireForTest('node:sqlite') + const db = new Database(dbPath) + db.exec('CREATE TABLE cursorDiskKV (key TEXT PRIMARY KEY, value BLOB)') + + const insert = db.prepare('INSERT INTO cursorDiskKV (key, value) VALUES (?, ?)') + insert.run( + 'bubbleId:old-composer:bubble-old', + bubbleValue({ + type: 2, + conversationId: 'old-composer', + createdAt: '2026-06-10T12:00:00.000Z', + text: 'older assistant response', + inputTokens: 100, + outputTokens: 50, + }), + ) + insert.run( + 'bubbleId:today-composer:bubble-today', + bubbleValue({ + type: 2, + conversationId: 'today-composer', + createdAt: '2026-06-12T08:00:00.000Z', + text: 'today assistant response', + inputTokens: 100, + outputTokens: 50, + }), + ) + + db.close() + return dbPath +} + +async function collect(dbPath: string, dateRange?: DateRange): Promise { + const provider = createCursorProvider(dbPath) + const source = { path: dbPath, project: 'cursor', provider: 'cursor' } + const calls: ParsedProviderCall[] = [] + for await (const call of provider.createSessionParser(source, new Set(), dateRange).parse()) { + calls.push(call) + } + return calls +} + +skipUnlessSqlite('cursor bubble scan window', () => { + it('does not drop older in-window bubbles when newer rows exceed the historic cap', async () => { + const dbPath = createDbWithOlderInWindowBubblePastHistoricCap() + const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation(() => true) + + const calls = await collect(dbPath) + + expect(calls.map(call => call.sessionId)).toContain('older-composer') + expect(String(stderrSpy.mock.calls.flat().join(''))).not.toContain('Older sessions may be missing') + + stderrSpy.mockRestore() + }) + + it('uses the requested date range as the bubble scan lower bound', async () => { + const dbPath = createDbWithOldAndRequestedRangeBubbles() + + const calls = await collect(dbPath, { + start: new Date('2026-06-12T00:00:00.000Z'), + end: new Date('2026-06-12T23:59:59.999Z'), + }) + + expect(calls.map(call => call.sessionId)).toEqual(['today-composer']) + }) +})