Skip to content

Commit 6e7c70b

Browse files
committed
fix
1 parent 509369d commit 6e7c70b

File tree

1 file changed

+94
-51
lines changed

1 file changed

+94
-51
lines changed

apps/sim/executor/execution/block-executor.ts

Lines changed: 94 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { db } from '@sim/db'
22
import { mcpServers } from '@sim/db/schema'
33
import { and, eq, inArray, isNull } from 'drizzle-orm'
44
import { getBaseUrl } from '@/lib/core/utils/urls'
5+
import { isCancellationRequested } from '@/lib/execution/cancellation'
56
import { createLogger } from '@/lib/logs/console/logger'
67
import {
78
BlockType,
@@ -32,6 +33,8 @@ import type { SubflowType } from '@/stores/workflows/workflow/types'
3233

3334
const logger = createLogger('BlockExecutor')
3435

36+
const CANCELLATION_CHECK_INTERVAL_MS = 1000
37+
3538
export class BlockExecutor {
3639
constructor(
3740
private blockHandlers: BlockHandler[],
@@ -548,10 +551,16 @@ export class BlockExecutor {
548551
return
549552
}
550553

551-
const [clientStream, executorStream] = stream.tee()
554+
const { clientStream: controlledClientStream, consume } = this.createControlledStream(
555+
ctx,
556+
stream,
557+
blockId,
558+
responseFormat,
559+
streamingExec
560+
)
552561

553562
const processedClientStream = streamingResponseFormatProcessor.processStream(
554-
clientStream,
563+
controlledClientStream,
555564
blockId,
556565
selectedOutputs,
557566
responseFormat
@@ -562,13 +571,6 @@ export class BlockExecutor {
562571
stream: processedClientStream,
563572
}
564573

565-
const executorConsumption = this.consumeExecutorStream(
566-
executorStream,
567-
streamingExec,
568-
blockId,
569-
responseFormat
570-
)
571-
572574
const clientConsumption = (async () => {
573575
try {
574576
await ctx.onStream?.(clientStreamingExec)
@@ -577,7 +579,7 @@ export class BlockExecutor {
577579
}
578580
})()
579581

580-
await Promise.all([clientConsumption, executorConsumption])
582+
await Promise.all([clientConsumption, consume()])
581583
}
582584

583585
private async forwardStream(
@@ -605,57 +607,98 @@ export class BlockExecutor {
605607
}
606608
}
607609

608-
private async consumeExecutorStream(
609-
stream: ReadableStream,
610-
streamingExec: { execution: any },
610+
private createControlledStream(
611+
ctx: ExecutionContext,
612+
sourceStream: ReadableStream,
611613
blockId: string,
612-
responseFormat: any
613-
): Promise<void> {
614-
const reader = stream.getReader()
615-
const decoder = new TextDecoder()
614+
responseFormat: any,
615+
streamingExec: { execution: any }
616+
): { clientStream: ReadableStream; consume: () => Promise<void> } {
617+
let clientController: ReadableStreamDefaultController<Uint8Array> | null = null
616618
let fullContent = ''
617619

618-
try {
619-
while (true) {
620-
const { done, value } = await reader.read()
621-
if (done) break
622-
fullContent += decoder.decode(value, { stream: true })
623-
}
624-
} catch (error) {
625-
logger.error('Error reading executor stream for block', { blockId, error })
626-
} finally {
627-
try {
628-
reader.releaseLock()
629-
} catch {}
630-
}
631-
632-
if (!fullContent) {
633-
return
634-
}
620+
const clientStream = new ReadableStream<Uint8Array>({
621+
start(controller) {
622+
clientController = controller
623+
},
624+
})
635625

636-
const executionOutput = streamingExec.execution?.output
637-
if (!executionOutput || typeof executionOutput !== 'object') {
638-
return
639-
}
626+
const consume = async () => {
627+
const reader = sourceStream.getReader()
628+
const decoder = new TextDecoder()
629+
let lastCancellationCheck = Date.now()
640630

641-
if (responseFormat) {
642631
try {
643-
const parsed = JSON.parse(fullContent.trim())
644-
645-
streamingExec.execution.output = {
646-
...parsed,
647-
tokens: executionOutput.tokens,
648-
toolCalls: executionOutput.toolCalls,
649-
providerTiming: executionOutput.providerTiming,
650-
cost: executionOutput.cost,
651-
model: executionOutput.model,
632+
while (true) {
633+
const now = Date.now()
634+
if (ctx.executionId && now - lastCancellationCheck >= CANCELLATION_CHECK_INTERVAL_MS) {
635+
lastCancellationCheck = now
636+
const cancelled = await isCancellationRequested(ctx.executionId)
637+
if (cancelled) {
638+
ctx.isCancelled = true
639+
try {
640+
clientController?.close()
641+
} catch {}
642+
reader.cancel()
643+
break
644+
}
645+
}
646+
647+
const { done, value } = await reader.read()
648+
if (done) {
649+
try {
650+
clientController?.close()
651+
} catch {}
652+
break
653+
}
654+
655+
fullContent += decoder.decode(value, { stream: true })
656+
try {
657+
clientController?.enqueue(value)
658+
} catch {}
652659
}
653-
return
654660
} catch (error) {
655-
logger.warn('Failed to parse streamed content for response format', { blockId, error })
661+
if (!ctx.isCancelled) {
662+
logger.error('Error reading stream for block', { blockId, error })
663+
}
664+
try {
665+
clientController?.close()
666+
} catch {}
667+
} finally {
668+
try {
669+
reader.releaseLock()
670+
} catch {}
671+
}
672+
673+
if (!fullContent) {
674+
return
675+
}
676+
677+
const executionOutput = streamingExec.execution?.output
678+
if (!executionOutput || typeof executionOutput !== 'object') {
679+
return
656680
}
681+
682+
if (responseFormat) {
683+
try {
684+
const parsed = JSON.parse(fullContent.trim())
685+
streamingExec.execution.output = {
686+
...parsed,
687+
tokens: executionOutput.tokens,
688+
toolCalls: executionOutput.toolCalls,
689+
providerTiming: executionOutput.providerTiming,
690+
cost: executionOutput.cost,
691+
model: executionOutput.model,
692+
}
693+
return
694+
} catch (error) {
695+
logger.warn('Failed to parse streamed content for response format', { blockId, error })
696+
}
697+
}
698+
699+
executionOutput.content = fullContent
657700
}
658701

659-
executionOutput.content = fullContent
702+
return { clientStream, consume }
660703
}
661704
}

0 commit comments

Comments
 (0)