Skip to content

Commit c29447a

Browse files
fix(executor): skip content persistence when stream consumer exits early
Previously, if the onStream consumer caught an internal error without re-throwing, the block-executor would treat the shortened accumulator as the complete response, persist a truncated string to memory via appendToMemory, and set it as executionOutput.content. Track whether the source ReadableStream actually closed (done=true) in the pull handler. If onStream returns before the source drains, skip content persistence and log a warning — the old tee()-based flow was immune to this because the executor branch drained independently of the client branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 43a727d commit c29447a

1 file changed

Lines changed: 13 additions & 0 deletions

File tree

apps/sim/executor/execution/block-executor.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ export class BlockExecutor {
618618
const decoder = new TextDecoder()
619619
const accumulated: string[] = []
620620
let drainError: unknown
621+
let sourceFullyDrained = false
621622

622623
const clientSource = new ReadableStream<Uint8Array>({
623624
async pull(controller) {
@@ -626,6 +627,7 @@ export class BlockExecutor {
626627
if (done) {
627628
const tail = decoder.decode()
628629
if (tail) accumulated.push(tail)
630+
sourceFullyDrained = true
629631
controller.close()
630632
return
631633
}
@@ -669,6 +671,17 @@ export class BlockExecutor {
669671
return
670672
}
671673

674+
// If the onStream consumer exited before the source drained (e.g. it caught
675+
// an internal error and returned normally), `accumulated` holds a truncated
676+
// response. Persisting that to memory or setting it as the block output
677+
// would corrupt downstream state — skip and log instead.
678+
if (!sourceFullyDrained) {
679+
this.execLogger.warn('Stream consumer exited before source drained; skipping content persistence', {
680+
blockId,
681+
})
682+
return
683+
}
684+
672685
const fullContent = accumulated.join('')
673686
if (fullContent) {
674687
const executionOutput = streamingExec.execution?.output

0 commit comments

Comments
 (0)