diff --git a/.changeset/parse-worker-hardening.md b/.changeset/parse-worker-hardening.md new file mode 100644 index 0000000..9a25275 --- /dev/null +++ b/.changeset/parse-worker-hardening.md @@ -0,0 +1,5 @@ +--- +"@stainless-code/codemap": minor +--- + +Add per-file parse timeouts with worker recycle during full and incremental indexing; failures log to errors.log and appear in the index summary. diff --git a/docs/plans/agent-surface-delivery.md b/docs/plans/agent-surface-delivery.md index f621f28..1c3de56 100644 --- a/docs/plans/agent-surface-delivery.md +++ b/docs/plans/agent-surface-delivery.md @@ -10,11 +10,11 @@ ## Quick resume -| Next action | Detail | -| -------------------- | ------------------------------------------------------------------------------------------------------------------------------------ | -| **Review / merge** | PR 3 — index lock + error log (branch `feat/index-lock`) when open | -| **Start next** | **PR 4** — trace recipes (`call-path`, `symbol-neighborhood`) or **PR 5** — `affected-tests-recipe` (parallel with 4 after 3 merges) | -| **Do not start yet** | PR 6 (MCP trace tools) until PR 4 land; PR 9 (eval harness) until PR 8 | +| Next action | Detail | +| -------------------- | ------------------------------------------------------------------------------------------ | +| **Review / merge** | PR 3 stack — parse worker hardening (`feat/parse-worker-hardening`) when open | +| **Start next** | **PR 4** — trace recipes or **PR 5** — `affected-tests-recipe` (parallel after PR 3 lands) | +| **Do not start yet** | PR 6 (MCP trace tools) until PR 4 land; PR 9 (eval harness) until PR 8 | Update the table below when a PR merges or a new branch opens. @@ -35,15 +35,15 @@ Merge each PR to `main` directly. No long-lived integration branch (`feat/agent- Max **3 parallel tracks** at once. -| PR | Plans | Status | Blocked by | Parallel with | -| ----- | ----------------------------------------------------------------------------------------------------------------------------- | ------- | --------------------- | --------------------------------- | -| **3** | [`index-lock-and-error-log`](./index-lock-and-error-log.md) → [`parse-worker-hardening`](./parse-worker-hardening.md) (stack) | open | PR 2 merged | 4, 5 | -| **4** | Recipe half of [`mcp-trace-explore-tools`](./mcp-trace-explore-tools.md) (`call-path`, `symbol-neighborhood` SQL + tests) | planned | — | 3, 5 | -| **5** | [`affected-tests-recipe`](./affected-tests-recipe.md) | planned | — | 3, 4 | -| **6** | MCP half of trace (`trace` / `explore` / `node` tools) + update instructions | planned | PR 1, PR 4 | — | -| **7** | [`field-qualified-search`](./field-qualified-search.md) | planned | PR 1 | 4, 5 if `mcp-server.ts` untouched | -| **8** | [`agents-init-mcp-wiring`](./agents-init-mcp-wiring.md) | planned | PR 1 | 3–5 | -| **9** | [`agent-eval-harness`](./agent-eval-harness.md) | planned | PR 1, PR 8, allowlist | **last P1** | +| PR | Plans | Status | Blocked by | Parallel with | +| ----- | ----------------------------------------------------------------------------------------------------------------------------- | ------- | --------------------------------------------------------------------------------------------- | --------------------------------- | +| **3** | [`index-lock-and-error-log`](./index-lock-and-error-log.md) → [`parse-worker-hardening`](./parse-worker-hardening.md) (stack) | open | [#129](https://github.com/stainless-code/codemap/pull/129) merged; worker hardening in flight | 4, 5 | +| **4** | Recipe half of [`mcp-trace-explore-tools`](./mcp-trace-explore-tools.md) (`call-path`, `symbol-neighborhood` SQL + tests) | planned | — | 3, 5 | +| **5** | [`affected-tests-recipe`](./affected-tests-recipe.md) | planned | — | 3, 4 | +| **6** | MCP half of trace (`trace` / `explore` / `node` tools) + update instructions | planned | PR 1, PR 4 | — | +| **7** | [`field-qualified-search`](./field-qualified-search.md) | planned | PR 1 | 4, 5 if `mcp-server.ts` untouched | +| **8** | [`agents-init-mcp-wiring`](./agents-init-mcp-wiring.md) | planned | PR 1 | 3–5 | +| **9** | [`agent-eval-harness`](./agent-eval-harness.md) | planned | PR 1, PR 8, allowlist | **last P1** | **Parallelization constraints** diff --git a/src/application/index-engine.ts b/src/application/index-engine.ts index 467b411..8c7c90b 100644 --- a/src/application/index-engine.ts +++ b/src/application/index-engine.ts @@ -1,9 +1,8 @@ import { spawnSync } from "node:child_process"; -import { readFileSync, statSync, writeFileSync } from "node:fs"; +import { readFileSync, writeFileSync } from "node:fs"; import { extname, join } from "node:path"; import { LANG_MAP } from "../constants"; -import { extractCssData } from "../css-parser"; import { openDb, closeDb, @@ -43,14 +42,11 @@ import { META_FTS5_ENABLED_KEY, SCHEMA_VERSION, } from "../db"; -import type { CodemapDatabase, DynamicImportRow, FileRow } from "../db"; -import { countLines } from "../extractors/offsets"; +import type { CodemapDatabase, DynamicImportRow } from "../db"; import { filterRowsByChangedFiles } from "../git-changed"; import { globSync } from "../glob-sync"; import { hashContent } from "../hash"; -import { extractMarkers, extractSuppressions } from "../markers"; import type { ParsedFile } from "../parse-worker"; -import { extractFileData } from "../parser"; import { resolveImports, resolveModuleSpecifier } from "../resolver"; import { getExcludeDirNames, @@ -80,29 +76,6 @@ import type { export const VALID_EXTENSIONS = new Set(Object.keys(LANG_MAP)); -const TS_EXTENSIONS = new Set([ - ".ts", - ".tsx", - ".mts", - ".cts", - ".js", - ".jsx", - ".mjs", - ".cjs", -]); -const CSS_EXTENSIONS = new Set([".css"]); - -function langFromExt(ext: string): string { - return LANG_MAP[ext.toLowerCase()] ?? "text"; -} - -function fileCategory(path: string): "ts" | "css" | "text" { - const ext = extname(path).toLowerCase(); - if (TS_EXTENSIONS.has(ext)) return "ts"; - if (CSS_EXTENSIONS.has(ext)) return "css"; - return "text"; -} - function persistTierSubstrate( db: CodemapDatabase, relPath: string, @@ -284,6 +257,14 @@ function reportParseError(relPath: string, reason: string): void { } } +function countParseFailures(results: readonly ParsedFile[]): number { + let failures = 0; + for (const parsed of results) { + if (parsed.error || parsed.parseError) failures++; + } + return failures; +} + function insertParsedResults( db: CodemapDatabase, results: ParsedFile[], @@ -480,11 +461,13 @@ export async function indexFiles( let indexed = 0; let skipped = 0; + let parseFailures = 0; if (fullRebuild) { const parseStart = performance.now(); const results = await parseFilesParallel(filePaths); parseMs = performance.now() - parseStart; + parseFailures = countParseFailures(results); // relPath is always POSIX-normalized ASCII (toRelativePosix upstream); byte order suffices // for architecture.md § Sorted inserts' B-tree locality and skips the Intl-collator tax. results.sort((a, b) => @@ -504,129 +487,48 @@ export async function indexFiles( const existingHashes = options?.existingHashes ?? getAllFileHashes(db); const root = getProjectRoot(); const sourceCache = options?.sourceCache; + const toParse: string[] = []; + const readFailed: string[] = []; + + for (const relPath of filePaths) { + const absPath = join(root, relPath); + let hash: string; + const cached = sourceCache?.get(relPath); + if (cached !== undefined) { + hash = cached.hash; + } else { + try { + hash = hashContent(readFileSync(absPath, "utf-8")); + } catch { + readFailed.push(relPath); + continue; + } + } + + if (existingHashes.get(relPath) === hash) { + skipped++; + } else { + toParse.push(relPath); + } + } + + const parseStart = performance.now(); + const parsedResults = await parseFilesParallel(toParse); + parseMs = performance.now() - parseStart; + parseFailures = countParseFailures(parsedResults); const transaction = db.transaction(() => { const deleted = options?.deletedPaths ?? []; if (deleted.length > 0) { deleteFilesFromIndex(db, deleted, quiet); } - for (const relPath of filePaths) { - const absPath = join(root, relPath); - let source: string; - let hash: string; - // `--files` targeted reindex + cache-less callers fall through to read+hash. - const cached = sourceCache?.get(relPath); - if (cached !== undefined) { - source = cached.source; - hash = cached.hash; - } else { - try { - source = readFileSync(absPath, "utf-8"); - } catch { - deleteFileData(db, relPath); - continue; - } - hash = hashContent(source); - } - - if (existingHashes.get(relPath) === hash) { - skipped++; - continue; - } - + for (const relPath of readFailed) { deleteFileData(db, relPath); - - const stat = statSync(absPath); - const lineCount = countLines(source); - - const fileRow: FileRow = { - path: relPath, - content_hash: hash, - size: stat.size, - line_count: lineCount, - language: langFromExt(extname(relPath)), - last_modified: Math.floor(stat.mtimeMs), - indexed_at: Date.now(), - }; - insertFile(db, fileRow); - - if (getFts5Enabled()) { - upsertSourceFts(db, relPath, source); - } - - try { - const category = fileCategory(relPath); - - if (category === "text") { - const markers = extractMarkers(source, relPath); - if (markers.length) insertMarkers(db, markers); - } else if (category === "css") { - const cssData = extractCssData(absPath, source, relPath); - if (cssData.variables.length) { - insertCssVariables(db, cssData.variables); - } - if (cssData.classes.length) insertCssClasses(db, cssData.classes); - if (cssData.keyframes.length) { - insertCssKeyframes(db, cssData.keyframes); - } - if (cssData.markers.length) insertMarkers(db, cssData.markers); - if (cssData.importSources.length) { - insertImports( - db, - cssData.importSources.map((importSource) => ({ - file_path: relPath, - source: importSource, - resolved_path: null, - specifiers: "[]", - is_type_only: 0, - line_number: 0, - })), - ); - } - } else { - const data = extractFileData(absPath, source, relPath); - if (data.symbols.length) insertSymbols(db, data.symbols); - const deps = resolveImports(absPath, data.imports, indexedPaths); - insertImportsWithSpecifiers( - db, - data.imports, - data.importSpecifiers, - ); - if (data.scopes.length) insertScopes(db, data.scopes); - if (data.references.length) insertReferences(db, data.references); - if (data.fileMetrics) insertFileMetrics(db, [data.fileMetrics]); - if (data.functionParams.length) - insertFunctionParams(db, data.functionParams); - if (data.runtimeMarkers.length) - insertRuntimeMarkers(db, data.runtimeMarkers); - if (data.testSuites.length) insertTestSuites(db, data.testSuites); - if (deps.length) insertDependencies(db, deps); - if (data.exports.length) insertExports(db, data.exports); - if (data.components.length) insertComponents(db, data.components); - if (data.markers.length) insertMarkers(db, data.markers); - if (data.typeMembers.length) - insertTypeMembers(db, data.typeMembers); - if (data.calls.length) insertCalls(db, data.calls); - persistDynamicImports(db, absPath, data.dynamicImports); - persistTierSubstrate(db, relPath, data); - if (data.hasSideEffects) { - db.run("UPDATE files SET has_side_effects = 1 WHERE path = ?", [ - relPath, - ]); - } - } - // Category-agnostic: one regex pass over raw source, no AST needed. - const suppressions = extractSuppressions(source, relPath); - if (suppressions.length) insertSuppressions(db, suppressions); - } catch (err) { - reportParseError( - relPath, - err instanceof Error ? err.message : String(err), - ); - } - - indexed++; } + for (const parsed of parsedResults) { + deleteFileData(db, parsed.relPath); + } + indexed += insertParsedResults(db, parsedResults, indexedPaths); }); transaction(); @@ -718,7 +620,7 @@ export async function indexFiles( `\n Codemap ${fullRebuild ? "(full rebuild)" : "(incremental)"}`, ); console.log( - ` ${indexed} files indexed, ${skipped} unchanged, ${elapsed}ms`, + ` ${indexed} files indexed, ${skipped} unchanged${parseFailures > 0 ? `, ${parseFailures} parse failures` : ""}, ${elapsed}ms`, ); console.log(` ───────────────────────────────────`); for (const [key, value] of Object.entries(stats)) { diff --git a/src/application/parse-timeout.test.ts b/src/application/parse-timeout.test.ts new file mode 100644 index 0000000..d5395df --- /dev/null +++ b/src/application/parse-timeout.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, test } from "bun:test"; + +import { + DEFAULT_PARSE_TIMEOUT_MS, + MAX_PARSE_TIMEOUT_MS, + computeParseTimeoutMs, + parseParseTimeoutMsOverride, +} from "./parse-timeout"; + +describe("parseParseTimeoutMsOverride", () => { + test("accepts positive integers", () => { + expect(parseParseTimeoutMsOverride("5000")).toBe(5000); + }); + + test("rejects malformed values", () => { + expect(parseParseTimeoutMsOverride("0")).toBeNull(); + expect(parseParseTimeoutMsOverride("abc")).toBeNull(); + }); +}); + +describe("computeParseTimeoutMs", () => { + test("uses env override when set", () => { + expect(computeParseTimeoutMs(1_000_000, "15000")).toBe(15_000); + }); + + test("scales with file size up to cap", () => { + expect(computeParseTimeoutMs(0, undefined)).toBe(DEFAULT_PARSE_TIMEOUT_MS); + expect(computeParseTimeoutMs(5_000_000, undefined)).toBe(10_000 + 100); + expect(computeParseTimeoutMs(2_000_000_000, undefined)).toBe( + MAX_PARSE_TIMEOUT_MS, + ); + }); +}); diff --git a/src/application/parse-timeout.ts b/src/application/parse-timeout.ts new file mode 100644 index 0000000..1b33f4c --- /dev/null +++ b/src/application/parse-timeout.ts @@ -0,0 +1,50 @@ +/** Default per-file parse timeout floor (ms). */ +export const DEFAULT_PARSE_TIMEOUT_MS = 10_000; + +/** Hard cap on per-file parse timeout (ms). */ +export const MAX_PARSE_TIMEOUT_MS = 30_000; + +/** +1 ms per this many bytes between floor and cap. */ +export const PARSE_TIMEOUT_BYTES_PER_MS = 50_000; + +const PARSE_TIMEOUT_ENV_RE = /^\d+$/; + +export function parseParseTimeoutMsOverride( + env: string | undefined, +): number | null { + if (env === undefined || env === "") return null; + if (!PARSE_TIMEOUT_ENV_RE.test(env)) return null; + const parsed = Number(env); + if (!Number.isSafeInteger(parsed) || parsed < 1) return null; + return parsed; +} + +/** + * Per-file parse budget: `CODEMAP_PARSE_TIMEOUT_MS` when set, else + * 10s + size scaling capped at 30s. + */ +export function computeParseTimeoutMs( + fileSizeBytes: number, + env: string | undefined = process.env.CODEMAP_PARSE_TIMEOUT_MS, +): number { + const override = parseParseTimeoutMsOverride(env); + if (override !== null) return override; + const scaled = + DEFAULT_PARSE_TIMEOUT_MS + + Math.floor(Math.max(0, fileSizeBytes) / PARSE_TIMEOUT_BYTES_PER_MS); + return Math.min(MAX_PARSE_TIMEOUT_MS, scaled); +} + +export class ParseTimeoutError extends Error { + readonly timeoutMs: number; + + constructor(timeoutMs: number) { + super(`parse timed out after ${timeoutMs}ms`); + this.name = "ParseTimeoutError"; + this.timeoutMs = timeoutMs; + } +} + +export function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/src/worker-pool.test.ts b/src/worker-pool.test.ts index ef892c1..4a27d8f 100644 --- a/src/worker-pool.test.ts +++ b/src/worker-pool.test.ts @@ -3,6 +3,7 @@ import { describe, expect, test } from "bun:test"; import { parseFilesParallel, parseParseWorkerCountOverride, + parseWorkerRecycleEvery, } from "./worker-pool"; describe("parseParseWorkerCountOverride", () => { @@ -26,6 +27,16 @@ describe("parseParseWorkerCountOverride", () => { }); }); +describe("parseWorkerRecycleEvery", () => { + test("defaults when unset", () => { + expect(parseWorkerRecycleEvery(undefined)).toBe(250); + }); + + test("accepts positive integers", () => { + expect(parseWorkerRecycleEvery("100")).toBe(100); + }); +}); + describe("parseFilesParallel", () => { test("resolves immediately for an empty file list", async () => { await expect(parseFilesParallel([])).resolves.toEqual([]); diff --git a/src/worker-pool.ts b/src/worker-pool.ts index 46cd6f5..af5ed8f 100644 --- a/src/worker-pool.ts +++ b/src/worker-pool.ts @@ -1,10 +1,18 @@ +import { statSync } from "node:fs"; import { cpus } from "node:os"; -import { basename, dirname } from "node:path"; +import { basename, dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; import { Worker as NodeWorker } from "node:worker_threads"; +import { + ParseTimeoutError, + computeParseTimeoutMs, + delay, + parseParseTimeoutMsOverride, +} from "./application/parse-timeout"; import { CODEMAP_BUILD_OUTPUT_DIR } from "./build-output"; import type { ParsedFile, WorkerInput, WorkerOutput } from "./parse-worker"; +import { parseWorkerInput } from "./parse-worker-core"; import { getFts5Enabled, getProjectRoot } from "./runtime"; const fromDist = @@ -21,6 +29,13 @@ const WORKER_URL_NODE = new URL( ); const PARSE_WORKER_COUNT_RE = /^\d+$/; +const RECYCLE_EVERY_RE = /^\d+$/; + +const DEFAULT_WORKER_RECYCLE_EVERY = 250; +/** Avoid worker spawn tax on tiny targeted/incremental batches. */ +const INLINE_PARSE_MAX = 12; +/** Cap a single worker message budget so one hung file cannot block a huge chunk until sum(timeouts). */ +const CHUNK_TIMEOUT_CAP_MS = 120_000; /** Returns clamped override [1, 32], or `null` when unset/empty/invalid. */ export function parseParseWorkerCountOverride( @@ -33,7 +48,26 @@ export function parseParseWorkerCountOverride( return Math.min(parsed, 32); } -// Override via `CODEMAP_PARSE_WORKERS` (clamped [1, 32]); default formula unchanged when unset. +export function parseWorkerRecycleEvery( + env: string | undefined = process.env.CODEMAP_WORKER_RECYCLE_EVERY, +): number { + if (env === undefined || env === "") return DEFAULT_WORKER_RECYCLE_EVERY; + if (!RECYCLE_EVERY_RE.test(env)) { + console.error( + `[worker-pool] ignoring invalid CODEMAP_WORKER_RECYCLE_EVERY=${JSON.stringify(env)} (expected positive integer)`, + ); + return DEFAULT_WORKER_RECYCLE_EVERY; + } + const parsed = Number(env); + if (!Number.isSafeInteger(parsed) || parsed < 1) { + console.error( + `[worker-pool] ignoring invalid CODEMAP_WORKER_RECYCLE_EVERY=${JSON.stringify(env)} (expected positive integer)`, + ); + return DEFAULT_WORKER_RECYCLE_EVERY; + } + return parsed; +} + function resolveWorkerCount(): number { const env = process.env.CODEMAP_PARSE_WORKERS; const override = parseParseWorkerCountOverride(env); @@ -50,51 +84,316 @@ const WORKER_COUNT = resolveWorkerCount(); const IS_BUN = typeof Bun !== "undefined"; const NODE_WORKER_PATH = IS_BUN ? "" : fileURLToPath(WORKER_URL_NODE); -export function parseFilesParallel(filePaths: string[]): Promise { - if (filePaths.length === 0) return Promise.resolve([]); +interface ParseWorkerSession { + parse(input: WorkerInput, timeoutMs: number): Promise; + dispose(): void; +} + +function createParseWorkerSession(): ParseWorkerSession { + let generation = 0; + let pending: + | { + gen: number; + resolve: (value: WorkerOutput) => void; + reject: (err: Error) => void; + } + | undefined; + + if (IS_BUN) { + let worker = new Worker(WORKER_URL_BUN); + const attach = (): void => { + worker.onmessage = (event: MessageEvent) => { + if (!pending || pending.gen !== generation) return; + const { resolve } = pending; + pending = undefined; + resolve(event.data); + }; + worker.onerror = (event: ErrorEvent) => { + if (!pending || pending.gen !== generation) return; + const { reject } = pending; + pending = undefined; + reject(event.error ?? new Error(event.message)); + }; + }; + attach(); + + return { + parse(input, timeoutMs) { + const gen = generation; + return Promise.race([ + new Promise((resolve, reject) => { + pending = { gen, resolve, reject }; + worker.postMessage(input); + }), + delay(timeoutMs).then(() => { + throw new ParseTimeoutError(timeoutMs); + }), + ]).catch((err) => { + if (err instanceof ParseTimeoutError) { + generation++; + pending = undefined; + worker.terminate(); + worker = new Worker(WORKER_URL_BUN); + attach(); + } + throw err; + }); + }, + dispose() { + pending = undefined; + generation++; + worker.terminate(); + }, + }; + } + + let worker = new NodeWorker(NODE_WORKER_PATH, { + type: "module", + } as import("node:worker_threads").WorkerOptions); + + const attachNode = (): void => { + worker.on("message", (data: WorkerOutput) => { + if (!pending || pending.gen !== generation) return; + const { resolve } = pending; + pending = undefined; + resolve(data); + }); + worker.on("error", (err: Error) => { + if (!pending || pending.gen !== generation) return; + const { reject } = pending; + pending = undefined; + reject(err); + }); + }; + attachNode(); + + return { + parse(input, timeoutMs) { + const gen = generation; + return Promise.race([ + new Promise((resolve, reject) => { + pending = { gen, resolve, reject }; + worker.postMessage(input); + }), + delay(timeoutMs).then(() => { + throw new ParseTimeoutError(timeoutMs); + }), + ]).catch((err) => { + if (err instanceof ParseTimeoutError) { + generation++; + pending = undefined; + void worker.terminate(); + worker = new NodeWorker(NODE_WORKER_PATH, { + type: "module", + } as import("node:worker_threads").WorkerOptions); + attachNode(); + } + throw err; + }); + }, + dispose() { + pending = undefined; + generation++; + void worker.terminate(); + }, + }; +} + +function fileSizeBytes(projectRoot: string, relPath: string): number { + try { + return statSync(join(projectRoot, relPath)).size; + } catch { + return 0; + } +} + +function chunkBudgetMs( + files: readonly string[], + projectRoot: string, + timeoutEnv: string | undefined, +): number { + let sum = 0; + for (const relPath of files) { + sum += computeParseTimeoutMs( + fileSizeBytes(projectRoot, relPath), + timeoutEnv, + ); + } + return Math.min(sum, CHUNK_TIMEOUT_CAP_MS); +} + +function timeoutParsedFile( + relPath: string, + projectRoot: string, + timeoutMs: number, +): ParsedFile { + const reason = `parse timed out after ${timeoutMs}ms`; + try { + const stat = statSync(join(projectRoot, relPath)); + return { + relPath, + parseError: reason, + fileRow: { + path: relPath, + content_hash: "", + size: stat.size, + line_count: 0, + language: "text", + last_modified: Math.floor(stat.mtimeMs), + indexed_at: Date.now(), + }, + category: "text", + }; + } catch { + return { + relPath, + error: true, + fileRow: {} as ParsedFile["fileRow"], + category: "text", + }; + } +} + +async function parseOneFile( + session: ParseWorkerSession, + relPath: string, + projectRoot: string, + fts5Enabled: boolean, + timeoutEnv: string | undefined, +): Promise { + const timeoutMs = computeParseTimeoutMs( + fileSizeBytes(projectRoot, relPath), + timeoutEnv, + ); + const input: WorkerInput = { + files: [relPath], + projectRoot, + fts5Enabled, + }; + try { + const output = await session.parse(input, timeoutMs); + return ( + output.results[0] ?? timeoutParsedFile(relPath, projectRoot, timeoutMs) + ); + } catch (err) { + if (err instanceof ParseTimeoutError) { + return timeoutParsedFile(relPath, projectRoot, err.timeoutMs); + } + throw err; + } +} + +async function parseChunkFiles( + session: ParseWorkerSession, + files: readonly string[], + projectRoot: string, + fts5Enabled: boolean, + timeoutEnv: string | undefined, +): Promise { + if (files.length === 0) return []; + if (files.length === 1) { + return [ + await parseOneFile( + session, + files[0]!, + projectRoot, + fts5Enabled, + timeoutEnv, + ), + ]; + } + + const timeoutMs = chunkBudgetMs(files, projectRoot, timeoutEnv); + const input: WorkerInput = { files: [...files], projectRoot, fts5Enabled }; + try { + const output = await session.parse(input, timeoutMs); + return output.results; + } catch (err) { + if (!(err instanceof ParseTimeoutError)) throw err; + const mid = Math.ceil(files.length / 2); + const left = await parseChunkFiles( + session, + files.slice(0, mid), + projectRoot, + fts5Enabled, + timeoutEnv, + ); + const right = await parseChunkFiles( + session, + files.slice(mid), + projectRoot, + fts5Enabled, + timeoutEnv, + ); + return [...left, ...right]; + } +} + +function splitWorkerChunks(filePaths: readonly string[]): string[][] { const chunkSize = Math.ceil(filePaths.length / WORKER_COUNT); const chunks: string[][] = []; for (let i = 0; i < filePaths.length; i += chunkSize) { chunks.push(filePaths.slice(i, i + chunkSize)); } + return chunks; +} + +export function parseFilesParallel(filePaths: string[]): Promise { + if (filePaths.length === 0) return Promise.resolve([]); const projectRoot = getProjectRoot(); const fts5Enabled = getFts5Enabled(); + const timeoutEnv = process.env.CODEMAP_PARSE_TIMEOUT_MS; + if ( + timeoutEnv !== undefined && + timeoutEnv !== "" && + parseParseTimeoutMsOverride(timeoutEnv) === null + ) { + console.error( + `[worker-pool] ignoring invalid CODEMAP_PARSE_TIMEOUT_MS=${JSON.stringify(timeoutEnv)} (expected positive integer)`, + ); + } + + if (filePaths.length <= INLINE_PARSE_MAX) { + return Promise.resolve( + parseWorkerInput({ files: [...filePaths], projectRoot, fts5Enabled }) + .results, + ); + } + + const recycleEvery = parseWorkerRecycleEvery(); + const chunks = splitWorkerChunks(filePaths); return Promise.all( - chunks.map( - (chunk) => - new Promise((resolve, reject) => { - const input: WorkerInput = { - files: chunk, - projectRoot, - fts5Enabled, - }; - - if (IS_BUN) { - const worker = new Worker(WORKER_URL_BUN); - worker.onmessage = (event: MessageEvent) => { - resolve(event.data.results); - worker.terminate(); - }; - worker.onerror = (event: ErrorEvent) => { - reject(event.error ?? new Error(event.message)); - worker.terminate(); - }; - worker.postMessage(input); - return; + chunks.map(async (chunk) => { + let session = createParseWorkerSession(); + let processed = 0; + try { + const results: ParsedFile[] = []; + for (let i = 0; i < chunk.length; ) { + const sliceEnd = Math.min(i + recycleEvery, chunk.length); + const slice = chunk.slice(i, sliceEnd); + results.push( + ...(await parseChunkFiles( + session, + slice, + projectRoot, + fts5Enabled, + timeoutEnv, + )), + ); + processed += slice.length; + i = sliceEnd; + if (processed >= recycleEvery && i < chunk.length) { + session.dispose(); + session = createParseWorkerSession(); + processed = 0; } - - const worker = new NodeWorker(NODE_WORKER_PATH, { - type: "module", - } as import("node:worker_threads").WorkerOptions); - worker.on("message", (data: WorkerOutput) => { - resolve(data.results); - void worker.terminate(); - }); - worker.on("error", reject); - worker.postMessage(input); - }), - ), + } + return results; + } finally { + session.dispose(); + } + }), ).then((parts) => parts.flat()); }