Skip to content

Commit 6b4e3dd

Browse files
committed
Accumulate messages in the task, allowing us to only have to send user mesages from the transport
1 parent d7817e0 commit 6b4e3dd

File tree

3 files changed

+299
-125
lines changed

3 files changed

+299
-125
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 84 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,15 @@ type ChatTaskWirePayload<TMessage extends UIMessage = UIMessage> = {
188188
*
189189
* - `messages` contains model-ready messages (converted via `convertToModelMessages`) —
190190
* pass these directly to `streamText`.
191-
* - `uiMessages` contains the raw `UIMessage[]` from the frontend.
192191
* - `clientData` contains custom data from the frontend (the `metadata` field from `sendMessage()`).
192+
*
193+
* The backend accumulates the full conversation history across turns, so the frontend
194+
* only needs to send new messages after the first turn.
193195
*/
194196
export type ChatTaskPayload = {
195197
/** Model-ready messages — pass directly to `streamText({ messages })`. */
196198
messages: ModelMessage[];
197199

198-
/** Raw UI messages from the frontend. */
199-
uiMessages: UIMessage[];
200-
201200
/** The unique identifier for the chat session */
202201
chatId: string;
203202

@@ -237,28 +236,6 @@ export type ChatTaskRunPayload = ChatTaskPayload & ChatTaskSignals;
237236
const messagesInput = streams.input<ChatTaskWirePayload>({ id: CHAT_MESSAGES_STREAM_ID });
238237
const stopInput = streams.input<{ stop: true; message?: string }>({ id: CHAT_STOP_STREAM_ID });
239238

240-
/**
241-
* Strips provider-specific IDs from message parts so that partial/stopped
242-
* assistant responses don't cause 404s when sent back to the provider
243-
* (e.g. OpenAI Responses API message IDs).
244-
* @internal
245-
*/
246-
function sanitizeMessages<TMessage extends UIMessage>(messages: TMessage[]): TMessage[] {
247-
return messages.map((msg) => {
248-
if (msg.role !== "assistant" || !msg.parts) return msg;
249-
return {
250-
...msg,
251-
parts: msg.parts.map((part: any) => {
252-
// Strip provider-specific metadata (e.g. OpenAI Responses API itemId)
253-
// and streaming state from assistant message parts. These cause 404s
254-
// when partial/stopped responses are sent back to the provider.
255-
const { providerMetadata, state, id, ...rest } = part;
256-
return rest;
257-
}),
258-
};
259-
});
260-
}
261-
262239
/**
263240
* Tracks how many times `pipeChat` has been called in the current `chatTask` run.
264241
* Used to prevent double-piping when a user both calls `pipeChat()` manually
@@ -496,6 +473,11 @@ function chatTask<TIdentifier extends string>(
496473

497474
let currentWirePayload = payload;
498475

476+
// Accumulated model messages across turns. Turn 1 initialises from the
477+
// full history the frontend sends; subsequent turns append only the new
478+
// user message(s) and the captured assistant response.
479+
let accumulatedMessages: ModelMessage[] = [];
480+
499481
// Mutable reference to the current turn's stop controller so the
500482
// stop input stream listener (registered once) can abort the right turn.
501483
let currentStopController: AbortController | undefined;
@@ -562,25 +544,45 @@ function chatTask<TIdentifier extends string>(
562544
pendingMessages.push(msg);
563545
});
564546

565-
// Convert wire payload to user-facing payload
566-
const sanitized = sanitizeMessages(uiMessages);
567-
const modelMessages = await convertToModelMessages(sanitized);
547+
// Convert the incoming UIMessages to model messages and update the accumulator.
548+
// Turn 1: full history from the frontend → replaces the accumulator.
549+
// Turn 2+: only the new message(s) → appended to the accumulator.
550+
const incomingModelMessages = await convertToModelMessages(uiMessages);
551+
552+
if (turn === 0) {
553+
accumulatedMessages = incomingModelMessages;
554+
} else if (currentWirePayload.trigger === "regenerate-message") {
555+
// Regenerate: frontend sent full history with last assistant message
556+
// removed. Reset the accumulator to match.
557+
accumulatedMessages = incomingModelMessages;
558+
} else {
559+
// Submit: frontend sent only the new user message(s). Append to accumulator.
560+
accumulatedMessages.push(...incomingModelMessages);
561+
}
562+
563+
// Captured by the onFinish callback below — works even on abort/stop.
564+
let capturedResponseMessage: UIMessage | undefined;
568565

569566
try {
570567
const result = await userRun({
571568
...restWire,
572-
messages: modelMessages,
573-
uiMessages: sanitized,
569+
messages: accumulatedMessages,
574570
clientData: wireMetadata,
575571
signal: combinedSignal,
576572
cancelSignal,
577573
stopSignal,
578574
});
579575

580576
// Auto-pipe if the run function returned a StreamTextResult or similar,
581-
// but only if pipeChat() wasn't already called manually during this turn
577+
// but only if pipeChat() wasn't already called manually during this turn.
578+
// We call toUIMessageStream ourselves to attach onFinish for response capture.
582579
if (_chatPipeCount === 0 && isUIMessageStreamable(result)) {
583-
await pipeChat(result, { signal: combinedSignal, spanName: "stream response" });
580+
const uiStream = result.toUIMessageStream({
581+
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
582+
capturedResponseMessage = responseMessage;
583+
},
584+
});
585+
await pipeChat(uiStream, { signal: combinedSignal, spanName: "stream response" });
584586
}
585587
} catch (error) {
586588
// Handle AbortError from streamText gracefully
@@ -596,6 +598,24 @@ function chatTask<TIdentifier extends string>(
596598
msgSub.off();
597599
}
598600

601+
// Append the assistant's response (partial or complete) to the accumulator.
602+
// The onFinish callback fires even on abort/stop, so partial responses
603+
// from stopped generation are captured correctly.
604+
if (capturedResponseMessage) {
605+
try {
606+
const responseModelMessages = await convertToModelMessages([
607+
stripProviderMetadata(capturedResponseMessage),
608+
]);
609+
accumulatedMessages.push(...responseModelMessages);
610+
} catch {
611+
// Conversion failed — skip accumulation for this turn
612+
}
613+
}
614+
// TODO: When the user calls `pipeChat` manually instead of returning a
615+
// StreamTextResult, we don't have access to onFinish. A future iteration
616+
// should let manual-mode users report back response messages for
617+
// accumulation (e.g. via a `chat.addMessages()` helper).
618+
599619
if (runSignal.aborted) return "exit";
600620

601621
// Write turn-complete control chunk so frontend closes its stream
@@ -723,3 +743,34 @@ function extractLastUserMessageText(messages: UIMessage[]): string | undefined {
723743

724744
return undefined;
725745
}
746+
747+
/**
748+
* Strips ephemeral OpenAI Responses API `itemId` from a UIMessage's parts.
749+
*
750+
* The OpenAI Responses provider attaches `itemId` to message parts via
751+
* `providerMetadata.openai.itemId`. These IDs are ephemeral — sending them
752+
* back in a subsequent `streamText` call causes 404s because the provider
753+
* can't find the referenced item (especially for stopped/partial responses).
754+
*
755+
* @internal
756+
*/
757+
function stripProviderMetadata(message: UIMessage): UIMessage {
758+
if (!message.parts) return message;
759+
return {
760+
...message,
761+
parts: message.parts.map((part: any) => {
762+
const openai = part.providerMetadata?.openai;
763+
if (!openai?.itemId) return part;
764+
765+
const { itemId, ...restOpenai } = openai;
766+
const { openai: _, ...restProviders } = part.providerMetadata;
767+
return {
768+
...part,
769+
providerMetadata: {
770+
...restProviders,
771+
...(Object.keys(restOpenai).length > 0 ? { openai: restOpenai } : {}),
772+
},
773+
};
774+
}),
775+
};
776+
}

0 commit comments

Comments
 (0)