Skip to content

Commit fa43e8e

Browse files
committed
fix(batch): handle locked ReadableStream when retrying batch trigger
When fetch crashes mid-stream during batch item upload (e.g., connection reset, timeout), the request stream may remain locked by fetch's internal reader. Attempting to cancel a locked stream throws 'Invalid state: ReadableStream is locked', causing the batch operation to fail. Added safeStreamCancel() helper that gracefully handles locked streams by catching and ignoring the locked error. The stream will be cleaned up by garbage collection when fetch eventually releases the reader. Fixes customer issue where batchTrigger failed with ReadableStream locked error during network instability.
1 parent 8716752 commit fa43e8e

File tree

3 files changed

+264
-8
lines changed

3 files changed

+264
-8
lines changed

packages/core/src/v3/apiClient/index.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -444,14 +444,14 @@ export class ApiClient {
444444

445445
if (retryResult.retry) {
446446
// Cancel the request stream before retry to prevent tee() from buffering
447-
await forRequest.cancel();
447+
await safeStreamCancel(forRequest);
448448
await sleep(retryResult.delay);
449449
// Use the backup stream for retry
450450
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
451451
}
452452

453453
// Not retrying - cancel the backup stream
454-
await forRetry.cancel();
454+
await safeStreamCancel(forRetry);
455455

456456
const errText = await response.text().catch((e) => (e as Error).message);
457457
let errJSON: Object | undefined;
@@ -471,7 +471,7 @@ export class ApiClient {
471471

472472
if (!parsed.success) {
473473
// Cancel backup stream since we're throwing
474-
await forRetry.cancel();
474+
await safeStreamCancel(forRetry);
475475
throw new Error(
476476
`Invalid response from server for batch ${batchId}: ${parsed.error.message}`
477477
);
@@ -484,14 +484,14 @@ export class ApiClient {
484484

485485
if (delay) {
486486
// Cancel the request stream before retry to prevent tee() from buffering
487-
await forRequest.cancel();
487+
await safeStreamCancel(forRequest);
488488
// Retry with the backup stream
489489
await sleep(delay);
490490
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
491491
}
492492

493493
// No more retries - cancel backup stream and throw descriptive error
494-
await forRetry.cancel();
494+
await safeStreamCancel(forRetry);
495495
throw new BatchNotSealedError({
496496
batchId,
497497
enqueuedCount: parsed.data.enqueuedCount ?? 0,
@@ -502,7 +502,7 @@ export class ApiClient {
502502
}
503503

504504
// Success - cancel the backup stream to release resources
505-
await forRetry.cancel();
505+
await safeStreamCancel(forRetry);
506506

507507
return parsed.data;
508508
} catch (error) {
@@ -519,13 +519,13 @@ export class ApiClient {
519519
const delay = calculateNextRetryDelay(retryOptions, attempt);
520520
if (delay) {
521521
// Cancel the request stream before retry to prevent tee() from buffering
522-
await forRequest.cancel();
522+
await safeStreamCancel(forRequest);
523523
await sleep(delay);
524524
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
525525
}
526526

527527
// No more retries - cancel the backup stream
528-
await forRetry.cancel();
528+
await safeStreamCancel(forRetry);
529529

530530
// Wrap in a more descriptive error
531531
const cause = error instanceof Error ? error : new Error(String(error));
@@ -1731,6 +1731,30 @@ function sleep(ms: number): Promise<void> {
17311731
return new Promise((resolve) => setTimeout(resolve, ms));
17321732
}
17331733

1734+
/**
1735+
* Safely cancels a ReadableStream, handling the case where it might be locked.
1736+
*
1737+
* When fetch uses a ReadableStream as a request body and an error occurs mid-transfer
1738+
* (connection reset, timeout, etc.), the stream may remain locked by fetch's internal reader.
1739+
* Attempting to cancel a locked stream throws "Invalid state: ReadableStream is locked".
1740+
*
1741+
* This function gracefully handles that case by catching the error and doing nothing -
1742+
* the stream will be cleaned up by garbage collection when the reader is released.
1743+
*/
1744+
async function safeStreamCancel(stream: ReadableStream<unknown>): Promise<void> {
1745+
try {
1746+
await stream.cancel();
1747+
} catch (error) {
1748+
// Ignore "locked" errors - the stream will be cleaned up when the reader is released.
1749+
// This happens when fetch crashes mid-read and doesn't release the reader lock.
1750+
if (error instanceof TypeError && String(error).includes("locked")) {
1751+
return;
1752+
}
1753+
// Re-throw unexpected errors
1754+
throw error;
1755+
}
1756+
}
1757+
17341758
// ============================================================================
17351759
// NDJSON Stream Helpers
17361760
// ============================================================================

packages/core/src/v3/apiClient/streamBatchItems.test.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,58 @@ describe("streamBatchItems stream cancellation on retry", () => {
398398
expect(cancelCallCount).toBeGreaterThanOrEqual(1);
399399
});
400400

401+
it("handles locked stream when connection error occurs mid-read", async () => {
402+
// This test simulates the real-world scenario where fetch throws an error
403+
// while still holding the reader lock on the request body stream.
404+
// This can happen with connection resets, timeouts, or network failures.
405+
let callIndex = 0;
406+
407+
const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => {
408+
const currentAttempt = callIndex;
409+
callIndex++;
410+
411+
if (init?.body && init.body instanceof ReadableStream) {
412+
if (currentAttempt === 0) {
413+
// First attempt: Get a reader and start reading, but throw while still holding the lock.
414+
// This simulates a connection error that happens mid-transfer.
415+
const reader = init.body.getReader();
416+
await reader.read(); // Start reading
417+
// DON'T release the lock - this simulates fetch crashing mid-read
418+
throw new TypeError("Connection reset by peer");
419+
}
420+
421+
// Subsequent attempts: consume and release normally
422+
await consumeAndRelease(init.body);
423+
}
424+
425+
// Second attempt: success
426+
return {
427+
ok: true,
428+
json: () =>
429+
Promise.resolve({
430+
id: "batch_test123",
431+
itemsAccepted: 10,
432+
itemsDeduplicated: 0,
433+
sealed: true,
434+
}),
435+
};
436+
});
437+
globalThis.fetch = mockFetch;
438+
439+
const client = new ApiClient("http://localhost:3030", "tr_test_key");
440+
441+
// This should NOT throw "ReadableStream is locked" error
442+
// Instead it should gracefully handle the locked stream and retry
443+
const result = await client.streamBatchItems(
444+
"batch_test123",
445+
[{ index: 0, task: "test-task", payload: "{}" }],
446+
{ retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } }
447+
);
448+
449+
expect(result.sealed).toBe(true);
450+
expect(mockFetch).toHaveBeenCalledTimes(2);
451+
});
452+
401453
it("does not leak memory by leaving tee branches unconsumed during multiple retries", async () => {
402454
let cancelCallCount = 0;
403455
let callIndex = 0;
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
#!/bin/bash
2+
#
3+
# Batch Queue Concurrency Cleaner
4+
#
5+
# Detects and cleans up stale concurrency entries that block batch processing.
6+
# This is a workaround for a bug where visibility timeout reclaims don't release concurrency.
7+
#
8+
# Uses a Lua script for ATOMIC detection of stale entries - no race conditions.
9+
#
10+
# Usage:
11+
# ./batch-concurrency-cleaner.sh --read-redis <url> --write-redis <url> [--delay <seconds>] [--dry-run]
12+
#
13+
14+
set -e
15+
16+
# Defaults
17+
DELAY=10
18+
DRY_RUN=false
19+
READ_REDIS=""
20+
WRITE_REDIS=""
21+
22+
# Parse arguments
23+
while [[ $# -gt 0 ]]; do
24+
case $1 in
25+
--read-redis)
26+
READ_REDIS="$2"
27+
shift 2
28+
;;
29+
--write-redis)
30+
WRITE_REDIS="$2"
31+
shift 2
32+
;;
33+
--delay)
34+
DELAY="$2"
35+
shift 2
36+
;;
37+
--dry-run)
38+
DRY_RUN=true
39+
shift
40+
;;
41+
-h|--help)
42+
echo "Usage: $0 --read-redis <url> --write-redis <url> [--delay <seconds>] [--dry-run]"
43+
exit 0
44+
;;
45+
*)
46+
echo "Unknown option: $1"
47+
exit 1
48+
;;
49+
esac
50+
done
51+
52+
if [[ -z "$READ_REDIS" ]] || [[ -z "$WRITE_REDIS" ]]; then
53+
echo "Error: --read-redis and --write-redis are required"
54+
exit 1
55+
fi
56+
57+
echo "Batch Queue Concurrency Cleaner (Atomic Version)"
58+
echo "================================================="
59+
echo "Read Redis: ${READ_REDIS:0:30}..."
60+
echo "Write Redis: ${WRITE_REDIS:0:30}..."
61+
echo "Delay: ${DELAY}s"
62+
echo "Dry run: $DRY_RUN"
63+
echo ""
64+
65+
rcli_read() {
66+
redis-cli -u "$READ_REDIS" --no-auth-warning "$@" 2>/dev/null
67+
}
68+
69+
rcli_write() {
70+
redis-cli -u "$WRITE_REDIS" --no-auth-warning "$@" 2>/dev/null
71+
}
72+
73+
# Lua script that ATOMICALLY checks for stale concurrency entries
74+
# KEYS[1] = concurrency key to check
75+
# KEYS[2-13] = in-flight data hash keys for shards 0-11
76+
# Returns: list of stale messageIds (not in any in-flight hash)
77+
FIND_STALE_LUA='
78+
local concurrency_key = KEYS[1]
79+
local stale = {}
80+
81+
-- Get all members of the concurrency set
82+
local members = redis.call("SMEMBERS", concurrency_key)
83+
84+
for _, msg_id in ipairs(members) do
85+
local found = false
86+
-- Check each in-flight shard (KEYS[2] through KEYS[13])
87+
for i = 2, 13 do
88+
if redis.call("HEXISTS", KEYS[i], msg_id) == 1 then
89+
found = true
90+
break
91+
end
92+
end
93+
if not found then
94+
table.insert(stale, msg_id)
95+
end
96+
end
97+
98+
return stale
99+
'
100+
101+
# Build the in-flight keys array (used in every Lua call)
102+
INFLIGHT_KEYS="engine:batch:inflight:0:data"
103+
for shard in 1 2 3 4 5 6 7 8 9 10 11; do
104+
INFLIGHT_KEYS="$INFLIGHT_KEYS engine:batch:inflight:$shard:data"
105+
done
106+
107+
# Main loop
108+
while true; do
109+
ts=$(date '+%H:%M:%S')
110+
111+
# Get master queue total and in-flight count for status display
112+
master_total=0
113+
for i in 0 1 2 3 4 5 6 7 8 9 10 11; do
114+
count=$(rcli_read ZCARD "engine:batch:master:$i")
115+
master_total=$((master_total + count))
116+
done
117+
118+
inflight_total=0
119+
for i in 0 1 2 3 4 5 6 7 8 9 10 11; do
120+
count=$(rcli_read HLEN "engine:batch:inflight:$i:data")
121+
inflight_total=$((inflight_total + count))
122+
done
123+
124+
# Scan for concurrency keys
125+
cursor=0
126+
total_stale=0
127+
cleaned_tenants=0
128+
129+
while true; do
130+
scan_output=$(rcli_read SCAN $cursor MATCH 'engine:batch:concurrency:tenant:*' COUNT 1000)
131+
cursor=$(echo "$scan_output" | head -1)
132+
keys=$(echo "$scan_output" | tail -n +2)
133+
134+
while IFS= read -r conc_key; do
135+
[[ -z "$conc_key" ]] && continue
136+
137+
# ATOMIC check: Run Lua script to find stale entries
138+
# 13 keys total: 1 concurrency key + 12 in-flight keys
139+
stale_ids=$(rcli_read EVAL "$FIND_STALE_LUA" 13 "$conc_key" $INFLIGHT_KEYS)
140+
141+
# Count stale entries
142+
stale_count=0
143+
stale_array=()
144+
while IFS= read -r stale_id; do
145+
[[ -z "$stale_id" ]] && continue
146+
stale_array+=("$stale_id")
147+
stale_count=$((stale_count + 1))
148+
done <<< "$stale_ids"
149+
150+
if [[ $stale_count -gt 0 ]]; then
151+
tenant="${conc_key#engine:batch:concurrency:tenant:}"
152+
total_stale=$((total_stale + stale_count))
153+
154+
if [[ "$DRY_RUN" == "true" ]]; then
155+
echo "[$ts] STALE (dry-run): $tenant ($stale_count entries)"
156+
for sid in "${stale_array[@]}"; do
157+
echo " - $sid"
158+
done
159+
else
160+
# Remove each stale entry individually with SREM (idempotent, safe)
161+
for sid in "${stale_array[@]}"; do
162+
rcli_write SREM "$conc_key" "$sid" >/dev/null
163+
done
164+
echo "[$ts] CLEANED: $tenant ($stale_count stale entries removed)"
165+
cleaned_tenants=$((cleaned_tenants + 1))
166+
fi
167+
fi
168+
done <<< "$keys"
169+
170+
[[ "$cursor" == "0" ]] && break
171+
done
172+
173+
if [[ "$DRY_RUN" == "true" ]]; then
174+
echo "[$ts] in-flight=$inflight_total master-queue=$master_total stale-found=$total_stale"
175+
else
176+
echo "[$ts] in-flight=$inflight_total master-queue=$master_total cleaned=$cleaned_tenants"
177+
fi
178+
179+
sleep "$DELAY"
180+
done

0 commit comments

Comments
 (0)