Skip to content

Commit 62829be

Browse files
authored
🤖 fix: serialize concurrent bash_output calls to prevent duplicate output (#1093)
Fixes race condition where parallel `bash_output` tool calls on the same process could both read from the same offset before either updates the read position, resulting in duplicate output being returned. ## Root Cause When `disableParallelToolUse` is false (our default), Claude can issue multiple `bash_output` calls in a single response. Without serialization, both calls would: 1. Read `pos.outputBytes = 0` 2. Start async `readOutput(0)` 3. Both read same content 4. Both set `pos.outputBytes` to same final value ## Fix - Add per-process `AsyncMutex` (using existing codebase pattern from `StreamManager`) to serialize `getOutput()` calls - Clean up `readPositions` and `outputLocks` maps during `cleanup()` to prevent stale state ## Test Added test case that calls `getOutput()` twice in parallel and verifies combined output contains each line exactly once (no duplicates). _Generated with `mux`_
1 parent a051c81 commit 62829be

File tree

2 files changed

+68
-21
lines changed

2 files changed

+68
-21
lines changed

‎src/node/services/backgroundProcessManager.test.ts‎

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,50 @@ describe("BackgroundProcessManager", () => {
637637
expect(output.elapsed_ms).toBeGreaterThanOrEqual(250);
638638
expect(output.elapsed_ms).toBeLessThan(1000); // Didn't hang
639639
});
640+
641+
it("should serialize concurrent getOutput calls to prevent duplicate output", async () => {
642+
// This test verifies the fix for the race condition where parallel bash_output
643+
// calls could both read from the same offset before either updates the position.
644+
// Without serialization, both calls would return the same output.
645+
const result = await manager.spawn(
646+
runtime,
647+
testWorkspaceId,
648+
"echo 'line1'; echo 'line2'; echo 'line3'",
649+
{ cwd: process.cwd(), displayName: "test" }
650+
);
651+
652+
expect(result.success).toBe(true);
653+
if (!result.success) return;
654+
655+
// Wait for all output to be written
656+
await new Promise((resolve) => setTimeout(resolve, 200));
657+
658+
// Call getOutput twice in parallel - without serialization, both would
659+
// read from offset 0 and return duplicate "line1\nline2\nline3"
660+
const [output1, output2] = await Promise.all([
661+
manager.getOutput(result.processId),
662+
manager.getOutput(result.processId),
663+
]);
664+
665+
expect(output1.success).toBe(true);
666+
expect(output2.success).toBe(true);
667+
if (!output1.success || !output2.success) return;
668+
669+
// Combine outputs - should contain all lines exactly once
670+
const combinedOutput = output1.output + output2.output;
671+
const line1Count = (combinedOutput.match(/line1/g) ?? []).length;
672+
const line2Count = (combinedOutput.match(/line2/g) ?? []).length;
673+
const line3Count = (combinedOutput.match(/line3/g) ?? []).length;
674+
675+
// Each line should appear exactly once across both outputs (no duplicates)
676+
expect(line1Count).toBe(1);
677+
expect(line2Count).toBe(1);
678+
expect(line3Count).toBe(1);
679+
680+
// One call should get the content, the other should get empty (already read)
681+
const hasContent = output1.output.trim().length > 0 || output2.output.trim().length > 0;
682+
expect(hasContent).toBe(true);
683+
});
640684
});
641685

642686
describe("integration: spawn and getOutput", () => {

‎src/node/services/backgroundProcessManager.ts‎

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Runtime, BackgroundHandle } from "@/node/runtime/Runtime";
22
import { spawnProcess } from "./backgroundProcessExecutor";
33
import { getErrorMessage } from "@/common/utils/errors";
44
import { log } from "./log";
5+
import { AsyncMutex } from "@/node/utils/concurrency/asyncMutex";
56

67
import { EventEmitter } from "events";
78

@@ -20,7 +21,9 @@ export interface BackgroundProcessMeta {
2021
}
2122

2223
/**
23-
* Represents a background process with file-based output
24+
* Represents a background process with file-based output.
25+
* All per-process state is consolidated here so cleanup is automatic when
26+
* the process is removed from the processes map.
2427
*/
2528
export interface BackgroundProcess {
2629
id: string; // Process ID (display_name from the bash tool call)
@@ -36,14 +39,11 @@ export interface BackgroundProcess {
3639
displayName?: string; // Human-readable name (e.g., "Dev Server")
3740
/** True if this process is being waited on (foreground mode) */
3841
isForeground: boolean;
39-
}
40-
41-
/**
42-
* Tracks read position for incremental output retrieval.
43-
* Each call to getOutput() returns only new content since the last read.
44-
*/
45-
interface OutputReadPosition {
46-
outputBytes: number;
42+
/** Tracks read position for incremental output retrieval */
43+
outputBytesRead: number;
44+
/** Mutex to serialize getOutput() calls (prevents race condition when
45+
* parallel tool calls read from same offset before position is updated) */
46+
outputLock: AsyncMutex;
4747
}
4848

4949
/**
@@ -89,11 +89,10 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
8989
// NOTE: This map is in-memory only. Background processes use nohup/setsid so they
9090
// could survive app restarts, but we kill all tracked processes on shutdown via
9191
// dispose(). Rehydrating from meta.json on startup is out of scope for now.
92+
// All per-process state (read position, output lock) is stored in BackgroundProcess
93+
// so cleanup is automatic when the process is removed from this map.
9294
private processes = new Map<string, BackgroundProcess>();
9395

94-
// Tracks read positions for incremental output retrieval
95-
private readPositions = new Map<string, OutputReadPosition>();
96-
9796
// Base directory for process output files
9897
private readonly bgOutputDir: string;
9998
// Tracks foreground processes (started via runtime.exec) that can be backgrounded
@@ -218,6 +217,8 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
218217
handle,
219218
displayName: config.displayName,
220219
isForeground: config.isForeground ?? false,
220+
outputBytesRead: 0,
221+
outputLock: new AsyncMutex(),
221222
};
222223

223224
// Store process in map
@@ -324,6 +325,8 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
324325
handle,
325326
displayName,
326327
isForeground: false, // Now in background
328+
outputBytesRead: 0,
329+
outputLock: new AsyncMutex(),
327330
};
328331

329332
// Store process in map
@@ -473,15 +476,13 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
473476
return { success: false, error: `Process not found: ${processId}` };
474477
}
475478

476-
// Get or initialize read position
477-
let pos = this.readPositions.get(processId);
478-
if (!pos) {
479-
pos = { outputBytes: 0 };
480-
this.readPositions.set(processId, pos);
481-
}
479+
// Acquire per-process mutex to serialize concurrent getOutput() calls.
480+
// This prevents race conditions where parallel tool calls both read from
481+
// the same offset before either updates the read position.
482+
await using _lock = await proc.outputLock.acquire();
482483

483484
log.debug(
484-
`BackgroundProcessManager.getOutput: proc.outputDir=${proc.outputDir}, offset=${pos.outputBytes}`
485+
`BackgroundProcessManager.getOutput: proc.outputDir=${proc.outputDir}, offset=${proc.outputBytesRead}`
485486
);
486487

487488
// Pre-compile regex if filter is provided
@@ -514,11 +515,11 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
514515
while (true) {
515516
// Read new content via the handle (works for both local and SSH runtimes)
516517
// Output is already unified in output.log (stdout + stderr via 2>&1)
517-
const result = await proc.handle.readOutput(pos.outputBytes);
518+
const result = await proc.handle.readOutput(proc.outputBytesRead);
518519
accumulatedRaw += result.content;
519520

520521
// Update read position
521-
pos.outputBytes = result.newOffset;
522+
proc.outputBytesRead = result.newOffset;
522523

523524
// Refresh process status
524525
const refreshedProc = await this.getProcess(processId);
@@ -699,6 +700,8 @@ export class BackgroundProcessManager extends EventEmitter<BackgroundProcessMana
699700
await Promise.all(matching.map((p) => this.terminate(p.id)));
700701

701702
// Remove from memory (output dirs left on disk for OS/workspace cleanup)
703+
// All per-process state (outputBytesRead, outputLock) is stored in the
704+
// BackgroundProcess object, so cleanup is automatic when we delete here.
702705
for (const p of matching) {
703706
this.processes.delete(p.id);
704707
}

0 commit comments

Comments
 (0)