Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ export function useChat(

const sendMessageRef = useRef<UseChatReturn['sendMessage']>(async () => {})
const processSSEStreamRef = useRef<
(reader: ReadableStreamDefaultReader<Uint8Array>, assistantId: string) => Promise<void>
(
reader: ReadableStreamDefaultReader<Uint8Array>,
assistantId: string,
expectedGen?: number
) => Promise<void>
>(async () => {})
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})

Expand Down Expand Up @@ -379,7 +383,8 @@ export function useChat(
}

appliedChatIdRef.current = chatHistory.id
setMessages(chatHistory.messages.map(mapStoredMessage))
const mappedMessages = chatHistory.messages.map(mapStoredMessage)
setMessages(mappedMessages)

if (chatHistory.resources.length > 0) {
setResources(chatHistory.resources)
Expand All @@ -392,6 +397,7 @@ export function useChat(
}

if (activeStreamId && !sendingRef.current) {
abortControllerRef.current?.abort()
const gen = ++streamGenRef.current
const abortController = new AbortController()
abortControllerRef.current = abortController
Expand Down Expand Up @@ -461,7 +467,7 @@ export function useChat(
},
})

await processSSEStreamRef.current(combinedStream.getReader(), assistantId)
await processSSEStreamRef.current(combinedStream.getReader(), assistantId, gen)
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
} finally {
Expand Down Expand Up @@ -489,7 +495,11 @@ export function useChat(
}, [activeResourceId, resources])

const processSSEStream = useCallback(
async (reader: ReadableStreamDefaultReader<Uint8Array>, assistantId: string) => {
async (
reader: ReadableStreamDefaultReader<Uint8Array>,
assistantId: string,
expectedGen?: number
) => {
const decoder = new TextDecoder()
let buffer = ''
const blocks: ContentBlock[] = []
Expand All @@ -511,10 +521,14 @@ export function useChat(
return b
}

const isStale = () => expectedGen !== undefined && streamGenRef.current !== expectedGen

const flush = () => {
if (isStale()) return
streamingBlocksRef.current = [...blocks]
const snapshot = { content: runningText, contentBlocks: [...blocks] }
setMessages((prev) => {
if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev
const idx = prev.findIndex((m) => m.id === assistantId)
if (idx >= 0) {
return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m))
Expand All @@ -524,6 +538,10 @@ export function useChat(
}

while (true) {
if (isStale()) {
reader.cancel().catch(() => {})
break
}
const { done, value } = await reader.read()
if (done) break

Expand Down Expand Up @@ -975,15 +993,15 @@ export function useChat(
content: message,
...(storedAttachments && { fileAttachments: storedAttachments }),
}
queryClient.setQueryData<TaskChatHistory>(taskKeys.detail(chatIdRef.current), (old) =>
old
queryClient.setQueryData<TaskChatHistory>(taskKeys.detail(chatIdRef.current), (old) => {
return old
? {
...old,
messages: [...old.messages, cachedUserMsg],
activeStreamId: userMessageId,
}
: undefined
)
})
}

const userAttachments = storedAttachments?.map(toDisplayAttachment)
Expand Down Expand Up @@ -1049,7 +1067,7 @@ export function useChat(

if (!response.body) throw new Error('No response body')

await processSSEStream(response.body.getReader(), assistantId)
await processSSEStream(response.body.getReader(), assistantId, gen)
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
setError(err instanceof Error ? err.message : 'Failed to send message')
Expand Down Expand Up @@ -1180,6 +1198,8 @@ export function useChat(

useEffect(() => {
return () => {
abortControllerRef.current?.abort()
abortControllerRef.current = null
streamGenRef.current++
sendingRef.current = false
}
Expand Down
Loading