From 1f72cba50521a5e6507be172f01ec989b1badd46 Mon Sep 17 00:00:00 2001 From: Sutu Sebastian Date: Mon, 25 May 2026 11:44:17 +0300 Subject: [PATCH 1/2] feat(index): per-file parse timeouts and worker recycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Process one file per worker message with Promise.race timeouts (10–30s scaled by size), recycle workers every 250 files, route incremental parsing through the worker pool, and surface parse failure counts in the index summary. --- .changeset/parse-worker-hardening.md | 5 + docs/plans/agent-surface-delivery.md | 28 +-- src/application/index-engine.ts | 192 ++++------------ src/application/parse-timeout.test.ts | 33 +++ src/application/parse-timeout.ts | 50 +++++ src/worker-pool.test.ts | 11 + src/worker-pool.ts | 308 ++++++++++++++++++++++---- 7 files changed, 426 insertions(+), 201 deletions(-) create mode 100644 .changeset/parse-worker-hardening.md create mode 100644 src/application/parse-timeout.test.ts create mode 100644 src/application/parse-timeout.ts diff --git a/.changeset/parse-worker-hardening.md b/.changeset/parse-worker-hardening.md new file mode 100644 index 0000000..9a25275 --- /dev/null +++ b/.changeset/parse-worker-hardening.md @@ -0,0 +1,5 @@ +--- +"@stainless-code/codemap": minor +--- + +Add per-file parse timeouts with worker recycle during full and incremental indexing; failures log to errors.log and appear in the index summary. diff --git a/docs/plans/agent-surface-delivery.md b/docs/plans/agent-surface-delivery.md index f621f28..1c3de56 100644 --- a/docs/plans/agent-surface-delivery.md +++ b/docs/plans/agent-surface-delivery.md @@ -10,11 +10,11 @@ ## Quick resume -| Next action | Detail | -| -------------------- | ------------------------------------------------------------------------------------------------------------------------------------ | -| **Review / merge** | PR 3 — index lock + error log (branch `feat/index-lock`) when open | -| **Start next** | **PR 4** — trace recipes (`call-path`, `symbol-neighborhood`) or **PR 5** — `affected-tests-recipe` (parallel with 4 after 3 merges) | -| **Do not start yet** | PR 6 (MCP trace tools) until PR 4 land; PR 9 (eval harness) until PR 8 | +| Next action | Detail | +| -------------------- | ------------------------------------------------------------------------------------------ | +| **Review / merge** | PR 3 stack — parse worker hardening (`feat/parse-worker-hardening`) when open | +| **Start next** | **PR 4** — trace recipes or **PR 5** — `affected-tests-recipe` (parallel after PR 3 lands) | +| **Do not start yet** | PR 6 (MCP trace tools) until PR 4 land; PR 9 (eval harness) until PR 8 | Update the table below when a PR merges or a new branch opens. @@ -35,15 +35,15 @@ Merge each PR to `main` directly. No long-lived integration branch (`feat/agent- Max **3 parallel tracks** at once. -| PR | Plans | Status | Blocked by | Parallel with | -| ----- | ----------------------------------------------------------------------------------------------------------------------------- | ------- | --------------------- | --------------------------------- | -| **3** | [`index-lock-and-error-log`](./index-lock-and-error-log.md) → [`parse-worker-hardening`](./parse-worker-hardening.md) (stack) | open | PR 2 merged | 4, 5 | -| **4** | Recipe half of [`mcp-trace-explore-tools`](./mcp-trace-explore-tools.md) (`call-path`, `symbol-neighborhood` SQL + tests) | planned | — | 3, 5 | -| **5** | [`affected-tests-recipe`](./affected-tests-recipe.md) | planned | — | 3, 4 | -| **6** | MCP half of trace (`trace` / `explore` / `node` tools) + update instructions | planned | PR 1, PR 4 | — | -| **7** | [`field-qualified-search`](./field-qualified-search.md) | planned | PR 1 | 4, 5 if `mcp-server.ts` untouched | -| **8** | [`agents-init-mcp-wiring`](./agents-init-mcp-wiring.md) | planned | PR 1 | 3–5 | -| **9** | [`agent-eval-harness`](./agent-eval-harness.md) | planned | PR 1, PR 8, allowlist | **last P1** | +| PR | Plans | Status | Blocked by | Parallel with | +| ----- | ----------------------------------------------------------------------------------------------------------------------------- | ------- | --------------------------------------------------------------------------------------------- | --------------------------------- | +| **3** | [`index-lock-and-error-log`](./index-lock-and-error-log.md) → [`parse-worker-hardening`](./parse-worker-hardening.md) (stack) | open | [#129](https://github.com/stainless-code/codemap/pull/129) merged; worker hardening in flight | 4, 5 | +| **4** | Recipe half of [`mcp-trace-explore-tools`](./mcp-trace-explore-tools.md) (`call-path`, `symbol-neighborhood` SQL + tests) | planned | — | 3, 5 | +| **5** | [`affected-tests-recipe`](./affected-tests-recipe.md) | planned | — | 3, 4 | +| **6** | MCP half of trace (`trace` / `explore` / `node` tools) + update instructions | planned | PR 1, PR 4 | — | +| **7** | [`field-qualified-search`](./field-qualified-search.md) | planned | PR 1 | 4, 5 if `mcp-server.ts` untouched | +| **8** | [`agents-init-mcp-wiring`](./agents-init-mcp-wiring.md) | planned | PR 1 | 3–5 | +| **9** | [`agent-eval-harness`](./agent-eval-harness.md) | planned | PR 1, PR 8, allowlist | **last P1** | **Parallelization constraints** diff --git a/src/application/index-engine.ts b/src/application/index-engine.ts index 467b411..8c7c90b 100644 --- a/src/application/index-engine.ts +++ b/src/application/index-engine.ts @@ -1,9 +1,8 @@ import { spawnSync } from "node:child_process"; -import { readFileSync, statSync, writeFileSync } from "node:fs"; +import { readFileSync, writeFileSync } from "node:fs"; import { extname, join } from "node:path"; import { LANG_MAP } from "../constants"; -import { extractCssData } from "../css-parser"; import { openDb, closeDb, @@ -43,14 +42,11 @@ import { META_FTS5_ENABLED_KEY, SCHEMA_VERSION, } from "../db"; -import type { CodemapDatabase, DynamicImportRow, FileRow } from "../db"; -import { countLines } from "../extractors/offsets"; +import type { CodemapDatabase, DynamicImportRow } from "../db"; import { filterRowsByChangedFiles } from "../git-changed"; import { globSync } from "../glob-sync"; import { hashContent } from "../hash"; -import { extractMarkers, extractSuppressions } from "../markers"; import type { ParsedFile } from "../parse-worker"; -import { extractFileData } from "../parser"; import { resolveImports, resolveModuleSpecifier } from "../resolver"; import { getExcludeDirNames, @@ -80,29 +76,6 @@ import type { export const VALID_EXTENSIONS = new Set(Object.keys(LANG_MAP)); -const TS_EXTENSIONS = new Set([ - ".ts", - ".tsx", - ".mts", - ".cts", - ".js", - ".jsx", - ".mjs", - ".cjs", -]); -const CSS_EXTENSIONS = new Set([".css"]); - -function langFromExt(ext: string): string { - return LANG_MAP[ext.toLowerCase()] ?? "text"; -} - -function fileCategory(path: string): "ts" | "css" | "text" { - const ext = extname(path).toLowerCase(); - if (TS_EXTENSIONS.has(ext)) return "ts"; - if (CSS_EXTENSIONS.has(ext)) return "css"; - return "text"; -} - function persistTierSubstrate( db: CodemapDatabase, relPath: string, @@ -284,6 +257,14 @@ function reportParseError(relPath: string, reason: string): void { } } +function countParseFailures(results: readonly ParsedFile[]): number { + let failures = 0; + for (const parsed of results) { + if (parsed.error || parsed.parseError) failures++; + } + return failures; +} + function insertParsedResults( db: CodemapDatabase, results: ParsedFile[], @@ -480,11 +461,13 @@ export async function indexFiles( let indexed = 0; let skipped = 0; + let parseFailures = 0; if (fullRebuild) { const parseStart = performance.now(); const results = await parseFilesParallel(filePaths); parseMs = performance.now() - parseStart; + parseFailures = countParseFailures(results); // relPath is always POSIX-normalized ASCII (toRelativePosix upstream); byte order suffices // for architecture.md § Sorted inserts' B-tree locality and skips the Intl-collator tax. results.sort((a, b) => @@ -504,129 +487,48 @@ export async function indexFiles( const existingHashes = options?.existingHashes ?? getAllFileHashes(db); const root = getProjectRoot(); const sourceCache = options?.sourceCache; + const toParse: string[] = []; + const readFailed: string[] = []; + + for (const relPath of filePaths) { + const absPath = join(root, relPath); + let hash: string; + const cached = sourceCache?.get(relPath); + if (cached !== undefined) { + hash = cached.hash; + } else { + try { + hash = hashContent(readFileSync(absPath, "utf-8")); + } catch { + readFailed.push(relPath); + continue; + } + } + + if (existingHashes.get(relPath) === hash) { + skipped++; + } else { + toParse.push(relPath); + } + } + + const parseStart = performance.now(); + const parsedResults = await parseFilesParallel(toParse); + parseMs = performance.now() - parseStart; + parseFailures = countParseFailures(parsedResults); const transaction = db.transaction(() => { const deleted = options?.deletedPaths ?? []; if (deleted.length > 0) { deleteFilesFromIndex(db, deleted, quiet); } - for (const relPath of filePaths) { - const absPath = join(root, relPath); - let source: string; - let hash: string; - // `--files` targeted reindex + cache-less callers fall through to read+hash. - const cached = sourceCache?.get(relPath); - if (cached !== undefined) { - source = cached.source; - hash = cached.hash; - } else { - try { - source = readFileSync(absPath, "utf-8"); - } catch { - deleteFileData(db, relPath); - continue; - } - hash = hashContent(source); - } - - if (existingHashes.get(relPath) === hash) { - skipped++; - continue; - } - + for (const relPath of readFailed) { deleteFileData(db, relPath); - - const stat = statSync(absPath); - const lineCount = countLines(source); - - const fileRow: FileRow = { - path: relPath, - content_hash: hash, - size: stat.size, - line_count: lineCount, - language: langFromExt(extname(relPath)), - last_modified: Math.floor(stat.mtimeMs), - indexed_at: Date.now(), - }; - insertFile(db, fileRow); - - if (getFts5Enabled()) { - upsertSourceFts(db, relPath, source); - } - - try { - const category = fileCategory(relPath); - - if (category === "text") { - const markers = extractMarkers(source, relPath); - if (markers.length) insertMarkers(db, markers); - } else if (category === "css") { - const cssData = extractCssData(absPath, source, relPath); - if (cssData.variables.length) { - insertCssVariables(db, cssData.variables); - } - if (cssData.classes.length) insertCssClasses(db, cssData.classes); - if (cssData.keyframes.length) { - insertCssKeyframes(db, cssData.keyframes); - } - if (cssData.markers.length) insertMarkers(db, cssData.markers); - if (cssData.importSources.length) { - insertImports( - db, - cssData.importSources.map((importSource) => ({ - file_path: relPath, - source: importSource, - resolved_path: null, - specifiers: "[]", - is_type_only: 0, - line_number: 0, - })), - ); - } - } else { - const data = extractFileData(absPath, source, relPath); - if (data.symbols.length) insertSymbols(db, data.symbols); - const deps = resolveImports(absPath, data.imports, indexedPaths); - insertImportsWithSpecifiers( - db, - data.imports, - data.importSpecifiers, - ); - if (data.scopes.length) insertScopes(db, data.scopes); - if (data.references.length) insertReferences(db, data.references); - if (data.fileMetrics) insertFileMetrics(db, [data.fileMetrics]); - if (data.functionParams.length) - insertFunctionParams(db, data.functionParams); - if (data.runtimeMarkers.length) - insertRuntimeMarkers(db, data.runtimeMarkers); - if (data.testSuites.length) insertTestSuites(db, data.testSuites); - if (deps.length) insertDependencies(db, deps); - if (data.exports.length) insertExports(db, data.exports); - if (data.components.length) insertComponents(db, data.components); - if (data.markers.length) insertMarkers(db, data.markers); - if (data.typeMembers.length) - insertTypeMembers(db, data.typeMembers); - if (data.calls.length) insertCalls(db, data.calls); - persistDynamicImports(db, absPath, data.dynamicImports); - persistTierSubstrate(db, relPath, data); - if (data.hasSideEffects) { - db.run("UPDATE files SET has_side_effects = 1 WHERE path = ?", [ - relPath, - ]); - } - } - // Category-agnostic: one regex pass over raw source, no AST needed. - const suppressions = extractSuppressions(source, relPath); - if (suppressions.length) insertSuppressions(db, suppressions); - } catch (err) { - reportParseError( - relPath, - err instanceof Error ? err.message : String(err), - ); - } - - indexed++; } + for (const parsed of parsedResults) { + deleteFileData(db, parsed.relPath); + } + indexed += insertParsedResults(db, parsedResults, indexedPaths); }); transaction(); @@ -718,7 +620,7 @@ export async function indexFiles( `\n Codemap ${fullRebuild ? "(full rebuild)" : "(incremental)"}`, ); console.log( - ` ${indexed} files indexed, ${skipped} unchanged, ${elapsed}ms`, + ` ${indexed} files indexed, ${skipped} unchanged${parseFailures > 0 ? `, ${parseFailures} parse failures` : ""}, ${elapsed}ms`, ); console.log(` ───────────────────────────────────`); for (const [key, value] of Object.entries(stats)) { diff --git a/src/application/parse-timeout.test.ts b/src/application/parse-timeout.test.ts new file mode 100644 index 0000000..d5395df --- /dev/null +++ b/src/application/parse-timeout.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, test } from "bun:test"; + +import { + DEFAULT_PARSE_TIMEOUT_MS, + MAX_PARSE_TIMEOUT_MS, + computeParseTimeoutMs, + parseParseTimeoutMsOverride, +} from "./parse-timeout"; + +describe("parseParseTimeoutMsOverride", () => { + test("accepts positive integers", () => { + expect(parseParseTimeoutMsOverride("5000")).toBe(5000); + }); + + test("rejects malformed values", () => { + expect(parseParseTimeoutMsOverride("0")).toBeNull(); + expect(parseParseTimeoutMsOverride("abc")).toBeNull(); + }); +}); + +describe("computeParseTimeoutMs", () => { + test("uses env override when set", () => { + expect(computeParseTimeoutMs(1_000_000, "15000")).toBe(15_000); + }); + + test("scales with file size up to cap", () => { + expect(computeParseTimeoutMs(0, undefined)).toBe(DEFAULT_PARSE_TIMEOUT_MS); + expect(computeParseTimeoutMs(5_000_000, undefined)).toBe(10_000 + 100); + expect(computeParseTimeoutMs(2_000_000_000, undefined)).toBe( + MAX_PARSE_TIMEOUT_MS, + ); + }); +}); diff --git a/src/application/parse-timeout.ts b/src/application/parse-timeout.ts new file mode 100644 index 0000000..1b33f4c --- /dev/null +++ b/src/application/parse-timeout.ts @@ -0,0 +1,50 @@ +/** Default per-file parse timeout floor (ms). */ +export const DEFAULT_PARSE_TIMEOUT_MS = 10_000; + +/** Hard cap on per-file parse timeout (ms). */ +export const MAX_PARSE_TIMEOUT_MS = 30_000; + +/** +1 ms per this many bytes between floor and cap. */ +export const PARSE_TIMEOUT_BYTES_PER_MS = 50_000; + +const PARSE_TIMEOUT_ENV_RE = /^\d+$/; + +export function parseParseTimeoutMsOverride( + env: string | undefined, +): number | null { + if (env === undefined || env === "") return null; + if (!PARSE_TIMEOUT_ENV_RE.test(env)) return null; + const parsed = Number(env); + if (!Number.isSafeInteger(parsed) || parsed < 1) return null; + return parsed; +} + +/** + * Per-file parse budget: `CODEMAP_PARSE_TIMEOUT_MS` when set, else + * 10s + size scaling capped at 30s. + */ +export function computeParseTimeoutMs( + fileSizeBytes: number, + env: string | undefined = process.env.CODEMAP_PARSE_TIMEOUT_MS, +): number { + const override = parseParseTimeoutMsOverride(env); + if (override !== null) return override; + const scaled = + DEFAULT_PARSE_TIMEOUT_MS + + Math.floor(Math.max(0, fileSizeBytes) / PARSE_TIMEOUT_BYTES_PER_MS); + return Math.min(MAX_PARSE_TIMEOUT_MS, scaled); +} + +export class ParseTimeoutError extends Error { + readonly timeoutMs: number; + + constructor(timeoutMs: number) { + super(`parse timed out after ${timeoutMs}ms`); + this.name = "ParseTimeoutError"; + this.timeoutMs = timeoutMs; + } +} + +export function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/src/worker-pool.test.ts b/src/worker-pool.test.ts index ef892c1..4a27d8f 100644 --- a/src/worker-pool.test.ts +++ b/src/worker-pool.test.ts @@ -3,6 +3,7 @@ import { describe, expect, test } from "bun:test"; import { parseFilesParallel, parseParseWorkerCountOverride, + parseWorkerRecycleEvery, } from "./worker-pool"; describe("parseParseWorkerCountOverride", () => { @@ -26,6 +27,16 @@ describe("parseParseWorkerCountOverride", () => { }); }); +describe("parseWorkerRecycleEvery", () => { + test("defaults when unset", () => { + expect(parseWorkerRecycleEvery(undefined)).toBe(250); + }); + + test("accepts positive integers", () => { + expect(parseWorkerRecycleEvery("100")).toBe(100); + }); +}); + describe("parseFilesParallel", () => { test("resolves immediately for an empty file list", async () => { await expect(parseFilesParallel([])).resolves.toEqual([]); diff --git a/src/worker-pool.ts b/src/worker-pool.ts index 46cd6f5..545353c 100644 --- a/src/worker-pool.ts +++ b/src/worker-pool.ts @@ -1,8 +1,15 @@ +import { statSync } from "node:fs"; import { cpus } from "node:os"; -import { basename, dirname } from "node:path"; +import { basename, dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; import { Worker as NodeWorker } from "node:worker_threads"; +import { + ParseTimeoutError, + computeParseTimeoutMs, + delay, + parseParseTimeoutMsOverride, +} from "./application/parse-timeout"; import { CODEMAP_BUILD_OUTPUT_DIR } from "./build-output"; import type { ParsedFile, WorkerInput, WorkerOutput } from "./parse-worker"; import { getFts5Enabled, getProjectRoot } from "./runtime"; @@ -21,6 +28,9 @@ const WORKER_URL_NODE = new URL( ); const PARSE_WORKER_COUNT_RE = /^\d+$/; +const RECYCLE_EVERY_RE = /^\d+$/; + +const DEFAULT_WORKER_RECYCLE_EVERY = 250; /** Returns clamped override [1, 32], or `null` when unset/empty/invalid. */ export function parseParseWorkerCountOverride( @@ -33,7 +43,26 @@ export function parseParseWorkerCountOverride( return Math.min(parsed, 32); } -// Override via `CODEMAP_PARSE_WORKERS` (clamped [1, 32]); default formula unchanged when unset. +export function parseWorkerRecycleEvery( + env: string | undefined = process.env.CODEMAP_WORKER_RECYCLE_EVERY, +): number { + if (env === undefined || env === "") return DEFAULT_WORKER_RECYCLE_EVERY; + if (!RECYCLE_EVERY_RE.test(env)) { + console.error( + `[worker-pool] ignoring invalid CODEMAP_WORKER_RECYCLE_EVERY=${JSON.stringify(env)} (expected positive integer)`, + ); + return DEFAULT_WORKER_RECYCLE_EVERY; + } + const parsed = Number(env); + if (!Number.isSafeInteger(parsed) || parsed < 1) { + console.error( + `[worker-pool] ignoring invalid CODEMAP_WORKER_RECYCLE_EVERY=${JSON.stringify(env)} (expected positive integer)`, + ); + return DEFAULT_WORKER_RECYCLE_EVERY; + } + return parsed; +} + function resolveWorkerCount(): number { const env = process.env.CODEMAP_PARSE_WORKERS; const override = parseParseWorkerCountOverride(env); @@ -50,51 +79,246 @@ const WORKER_COUNT = resolveWorkerCount(); const IS_BUN = typeof Bun !== "undefined"; const NODE_WORKER_PATH = IS_BUN ? "" : fileURLToPath(WORKER_URL_NODE); -export function parseFilesParallel(filePaths: string[]): Promise { - if (filePaths.length === 0) return Promise.resolve([]); - const chunkSize = Math.ceil(filePaths.length / WORKER_COUNT); - const chunks: string[][] = []; - for (let i = 0; i < filePaths.length; i += chunkSize) { - chunks.push(filePaths.slice(i, i + chunkSize)); - } +interface ParseWorkerSession { + parse(input: WorkerInput, timeoutMs: number): Promise; + dispose(): void; +} - const projectRoot = getProjectRoot(); - const fts5Enabled = getFts5Enabled(); +function createParseWorkerSession(): ParseWorkerSession { + let generation = 0; + let pending: + | { + gen: number; + resolve: (value: WorkerOutput) => void; + reject: (err: Error) => void; + } + | undefined; - return Promise.all( - chunks.map( - (chunk) => - new Promise((resolve, reject) => { - const input: WorkerInput = { - files: chunk, - projectRoot, - fts5Enabled, - }; - - if (IS_BUN) { - const worker = new Worker(WORKER_URL_BUN); - worker.onmessage = (event: MessageEvent) => { - resolve(event.data.results); - worker.terminate(); - }; - worker.onerror = (event: ErrorEvent) => { - reject(event.error ?? new Error(event.message)); - worker.terminate(); - }; + if (IS_BUN) { + let worker = new Worker(WORKER_URL_BUN); + const attach = (): void => { + worker.onmessage = (event: MessageEvent) => { + if (!pending || pending.gen !== generation) return; + const { resolve } = pending; + pending = undefined; + resolve(event.data); + }; + worker.onerror = (event: ErrorEvent) => { + if (!pending || pending.gen !== generation) return; + const { reject } = pending; + pending = undefined; + reject(event.error ?? new Error(event.message)); + }; + }; + attach(); + + return { + parse(input, timeoutMs) { + const gen = generation; + return Promise.race([ + new Promise((resolve, reject) => { + pending = { gen, resolve, reject }; worker.postMessage(input); - return; + }), + delay(timeoutMs).then(() => { + throw new ParseTimeoutError(timeoutMs); + }), + ]).catch((err) => { + if (err instanceof ParseTimeoutError) { + generation++; + pending = undefined; + worker.terminate(); + worker = new Worker(WORKER_URL_BUN); + attach(); } + throw err; + }); + }, + dispose() { + pending = undefined; + generation++; + worker.terminate(); + }, + }; + } - const worker = new NodeWorker(NODE_WORKER_PATH, { - type: "module", - } as import("node:worker_threads").WorkerOptions); - worker.on("message", (data: WorkerOutput) => { - resolve(data.results); - void worker.terminate(); - }); - worker.on("error", reject); + let worker = new NodeWorker(NODE_WORKER_PATH, { + type: "module", + } as import("node:worker_threads").WorkerOptions); + + const attachNode = (): void => { + worker.on("message", (data: WorkerOutput) => { + if (!pending || pending.gen !== generation) return; + const { resolve } = pending; + pending = undefined; + resolve(data); + }); + worker.on("error", (err: Error) => { + if (!pending || pending.gen !== generation) return; + const { reject } = pending; + pending = undefined; + reject(err); + }); + }; + attachNode(); + + return { + parse(input, timeoutMs) { + const gen = generation; + return Promise.race([ + new Promise((resolve, reject) => { + pending = { gen, resolve, reject }; worker.postMessage(input); }), - ), - ).then((parts) => parts.flat()); + delay(timeoutMs).then(() => { + throw new ParseTimeoutError(timeoutMs); + }), + ]).catch((err) => { + if (err instanceof ParseTimeoutError) { + generation++; + pending = undefined; + void worker.terminate(); + worker = new NodeWorker(NODE_WORKER_PATH, { + type: "module", + } as import("node:worker_threads").WorkerOptions); + attachNode(); + } + throw err; + }); + }, + dispose() { + pending = undefined; + generation++; + void worker.terminate(); + }, + }; +} + +function fileSizeBytes(projectRoot: string, relPath: string): number { + try { + return statSync(join(projectRoot, relPath)).size; + } catch { + return 0; + } +} + +function timeoutParsedFile( + relPath: string, + projectRoot: string, + timeoutMs: number, +): ParsedFile { + const reason = `parse timed out after ${timeoutMs}ms`; + try { + const stat = statSync(join(projectRoot, relPath)); + return { + relPath, + parseError: reason, + fileRow: { + path: relPath, + content_hash: "", + size: stat.size, + line_count: 0, + language: "text", + last_modified: Math.floor(stat.mtimeMs), + indexed_at: Date.now(), + }, + category: "text", + }; + } catch { + return { + relPath, + error: true, + fileRow: {} as ParsedFile["fileRow"], + category: "text", + }; + } +} + +async function parseOneFile( + session: ParseWorkerSession, + relPath: string, + projectRoot: string, + fts5Enabled: boolean, + timeoutEnv: string | undefined, +): Promise { + const timeoutMs = computeParseTimeoutMs( + fileSizeBytes(projectRoot, relPath), + timeoutEnv, + ); + const input: WorkerInput = { + files: [relPath], + projectRoot, + fts5Enabled, + }; + try { + const output = await session.parse(input, timeoutMs); + return ( + output.results[0] ?? timeoutParsedFile(relPath, projectRoot, timeoutMs) + ); + } catch (err) { + if (err instanceof ParseTimeoutError) { + return timeoutParsedFile(relPath, projectRoot, err.timeoutMs); + } + throw err; + } +} + +export function parseFilesParallel(filePaths: string[]): Promise { + if (filePaths.length === 0) return Promise.resolve([]); + + const projectRoot = getProjectRoot(); + const fts5Enabled = getFts5Enabled(); + const timeoutEnv = process.env.CODEMAP_PARSE_TIMEOUT_MS; + if ( + timeoutEnv !== undefined && + timeoutEnv !== "" && + parseParseTimeoutMsOverride(timeoutEnv) === null + ) { + console.error( + `[worker-pool] ignoring invalid CODEMAP_PARSE_TIMEOUT_MS=${JSON.stringify(timeoutEnv)} (expected positive integer)`, + ); + } + const recycleEvery = parseWorkerRecycleEvery(); + return runPool(filePaths, projectRoot, fts5Enabled, timeoutEnv, recycleEvery); +} + +async function runPool( + filePaths: string[], + projectRoot: string, + fts5Enabled: boolean, + timeoutEnv: string | undefined, + recycleEvery: number, +): Promise { + const results: ParsedFile[] = Array.from({ length: filePaths.length }); + let nextIndex = 0; + + async function workerLoop(): Promise { + let session = createParseWorkerSession(); + let processed = 0; + try { + while (true) { + const index = nextIndex++; + if (index >= filePaths.length) break; + const relPath = filePaths[index]!; + results[index] = await parseOneFile( + session, + relPath, + projectRoot, + fts5Enabled, + timeoutEnv, + ); + processed++; + if (processed >= recycleEvery) { + session.dispose(); + session = createParseWorkerSession(); + processed = 0; + } + } + } finally { + session.dispose(); + } + } + + await Promise.all(Array.from({ length: WORKER_COUNT }, () => workerLoop())); + return results; } From 467fe68ff52240ba431acbbcd83ed7a8cb45b15f Mon Sep 17 00:00:00 2001 From: Sutu Sebastian Date: Mon, 25 May 2026 11:58:54 +0300 Subject: [PATCH 2/2] fix(worker-pool): restore chunk parallelism for throughput Keep per-file timeout via chunk bisection on timeout, inline parse for small batches (<=12 files), and worker recycle. Fixes CLI integration test timeouts from per-message worker spawn overhead. --- src/worker-pool.ts | 153 +++++++++++++++++++++++++++++++++------------ 1 file changed, 114 insertions(+), 39 deletions(-) diff --git a/src/worker-pool.ts b/src/worker-pool.ts index 545353c..af5ed8f 100644 --- a/src/worker-pool.ts +++ b/src/worker-pool.ts @@ -12,6 +12,7 @@ import { } from "./application/parse-timeout"; import { CODEMAP_BUILD_OUTPUT_DIR } from "./build-output"; import type { ParsedFile, WorkerInput, WorkerOutput } from "./parse-worker"; +import { parseWorkerInput } from "./parse-worker-core"; import { getFts5Enabled, getProjectRoot } from "./runtime"; const fromDist = @@ -31,6 +32,10 @@ const PARSE_WORKER_COUNT_RE = /^\d+$/; const RECYCLE_EVERY_RE = /^\d+$/; const DEFAULT_WORKER_RECYCLE_EVERY = 250; +/** Avoid worker spawn tax on tiny targeted/incremental batches. */ +const INLINE_PARSE_MAX = 12; +/** Cap a single worker message budget so one hung file cannot block a huge chunk until sum(timeouts). */ +const CHUNK_TIMEOUT_CAP_MS = 120_000; /** Returns clamped override [1, 32], or `null` when unset/empty/invalid. */ export function parseParseWorkerCountOverride( @@ -202,6 +207,21 @@ function fileSizeBytes(projectRoot: string, relPath: string): number { } } +function chunkBudgetMs( + files: readonly string[], + projectRoot: string, + timeoutEnv: string | undefined, +): number { + let sum = 0; + for (const relPath of files) { + sum += computeParseTimeoutMs( + fileSizeBytes(projectRoot, relPath), + timeoutEnv, + ); + } + return Math.min(sum, CHUNK_TIMEOUT_CAP_MS); +} + function timeoutParsedFile( relPath: string, projectRoot: string, @@ -263,6 +283,61 @@ async function parseOneFile( } } +async function parseChunkFiles( + session: ParseWorkerSession, + files: readonly string[], + projectRoot: string, + fts5Enabled: boolean, + timeoutEnv: string | undefined, +): Promise { + if (files.length === 0) return []; + if (files.length === 1) { + return [ + await parseOneFile( + session, + files[0]!, + projectRoot, + fts5Enabled, + timeoutEnv, + ), + ]; + } + + const timeoutMs = chunkBudgetMs(files, projectRoot, timeoutEnv); + const input: WorkerInput = { files: [...files], projectRoot, fts5Enabled }; + try { + const output = await session.parse(input, timeoutMs); + return output.results; + } catch (err) { + if (!(err instanceof ParseTimeoutError)) throw err; + const mid = Math.ceil(files.length / 2); + const left = await parseChunkFiles( + session, + files.slice(0, mid), + projectRoot, + fts5Enabled, + timeoutEnv, + ); + const right = await parseChunkFiles( + session, + files.slice(mid), + projectRoot, + fts5Enabled, + timeoutEnv, + ); + return [...left, ...right]; + } +} + +function splitWorkerChunks(filePaths: readonly string[]): string[][] { + const chunkSize = Math.ceil(filePaths.length / WORKER_COUNT); + const chunks: string[][] = []; + for (let i = 0; i < filePaths.length; i += chunkSize) { + chunks.push(filePaths.slice(i, i + chunkSize)); + } + return chunks; +} + export function parseFilesParallel(filePaths: string[]): Promise { if (filePaths.length === 0) return Promise.resolve([]); @@ -278,47 +353,47 @@ export function parseFilesParallel(filePaths: string[]): Promise { `[worker-pool] ignoring invalid CODEMAP_PARSE_TIMEOUT_MS=${JSON.stringify(timeoutEnv)} (expected positive integer)`, ); } - const recycleEvery = parseWorkerRecycleEvery(); - return runPool(filePaths, projectRoot, fts5Enabled, timeoutEnv, recycleEvery); -} -async function runPool( - filePaths: string[], - projectRoot: string, - fts5Enabled: boolean, - timeoutEnv: string | undefined, - recycleEvery: number, -): Promise { - const results: ParsedFile[] = Array.from({ length: filePaths.length }); - let nextIndex = 0; + if (filePaths.length <= INLINE_PARSE_MAX) { + return Promise.resolve( + parseWorkerInput({ files: [...filePaths], projectRoot, fts5Enabled }) + .results, + ); + } + + const recycleEvery = parseWorkerRecycleEvery(); + const chunks = splitWorkerChunks(filePaths); - async function workerLoop(): Promise { - let session = createParseWorkerSession(); - let processed = 0; - try { - while (true) { - const index = nextIndex++; - if (index >= filePaths.length) break; - const relPath = filePaths[index]!; - results[index] = await parseOneFile( - session, - relPath, - projectRoot, - fts5Enabled, - timeoutEnv, - ); - processed++; - if (processed >= recycleEvery) { - session.dispose(); - session = createParseWorkerSession(); - processed = 0; + return Promise.all( + chunks.map(async (chunk) => { + let session = createParseWorkerSession(); + let processed = 0; + try { + const results: ParsedFile[] = []; + for (let i = 0; i < chunk.length; ) { + const sliceEnd = Math.min(i + recycleEvery, chunk.length); + const slice = chunk.slice(i, sliceEnd); + results.push( + ...(await parseChunkFiles( + session, + slice, + projectRoot, + fts5Enabled, + timeoutEnv, + )), + ); + processed += slice.length; + i = sliceEnd; + if (processed >= recycleEvery && i < chunk.length) { + session.dispose(); + session = createParseWorkerSession(); + processed = 0; + } } + return results; + } finally { + session.dispose(); } - } finally { - session.dispose(); - } - } - - await Promise.all(Array.from({ length: WORKER_COUNT }, () => workerLoop())); - return results; + }), + ).then((parts) => parts.flat()); }