diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 01cfc0e73d..e527144fdc 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -279,7 +279,11 @@ export function useChat( const sendMessageRef = useRef(async () => {}) const processSSEStreamRef = useRef< - (reader: ReadableStreamDefaultReader, assistantId: string) => Promise + ( + reader: ReadableStreamDefaultReader, + assistantId: string, + expectedGen?: number + ) => Promise >(async () => {}) const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {}) @@ -379,7 +383,8 @@ export function useChat( } appliedChatIdRef.current = chatHistory.id - setMessages(chatHistory.messages.map(mapStoredMessage)) + const mappedMessages = chatHistory.messages.map(mapStoredMessage) + setMessages(mappedMessages) if (chatHistory.resources.length > 0) { setResources(chatHistory.resources) @@ -392,6 +397,7 @@ export function useChat( } if (activeStreamId && !sendingRef.current) { + abortControllerRef.current?.abort() const gen = ++streamGenRef.current const abortController = new AbortController() abortControllerRef.current = abortController @@ -461,7 +467,7 @@ export function useChat( }, }) - await processSSEStreamRef.current(combinedStream.getReader(), assistantId) + await processSSEStreamRef.current(combinedStream.getReader(), assistantId, gen) } catch (err) { if (err instanceof Error && err.name === 'AbortError') return } finally { @@ -489,7 +495,11 @@ export function useChat( }, [activeResourceId, resources]) const processSSEStream = useCallback( - async (reader: ReadableStreamDefaultReader, assistantId: string) => { + async ( + reader: ReadableStreamDefaultReader, + assistantId: string, + expectedGen?: number + ) => { const decoder = new TextDecoder() let buffer = '' const blocks: ContentBlock[] = [] @@ -511,10 +521,14 @@ export function useChat( return b } + const isStale = () => expectedGen !== undefined && streamGenRef.current !== expectedGen + const flush = () => { + if (isStale()) return streamingBlocksRef.current = [...blocks] const snapshot = { content: runningText, contentBlocks: [...blocks] } setMessages((prev) => { + if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev const idx = prev.findIndex((m) => m.id === assistantId) if (idx >= 0) { return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m)) @@ -524,6 +538,10 @@ export function useChat( } while (true) { + if (isStale()) { + reader.cancel().catch(() => {}) + break + } const { done, value } = await reader.read() if (done) break @@ -975,15 +993,15 @@ export function useChat( content: message, ...(storedAttachments && { fileAttachments: storedAttachments }), } - queryClient.setQueryData(taskKeys.detail(chatIdRef.current), (old) => - old + queryClient.setQueryData(taskKeys.detail(chatIdRef.current), (old) => { + return old ? { ...old, messages: [...old.messages, cachedUserMsg], activeStreamId: userMessageId, } : undefined - ) + }) } const userAttachments = storedAttachments?.map(toDisplayAttachment) @@ -1049,7 +1067,7 @@ export function useChat( if (!response.body) throw new Error('No response body') - await processSSEStream(response.body.getReader(), assistantId) + await processSSEStream(response.body.getReader(), assistantId, gen) } catch (err) { if (err instanceof Error && err.name === 'AbortError') return setError(err instanceof Error ? err.message : 'Failed to send message') @@ -1180,6 +1198,8 @@ export function useChat( useEffect(() => { return () => { + abortControllerRef.current?.abort() + abortControllerRef.current = null streamGenRef.current++ sendingRef.current = false }