Skip to content

Commit 95aba16

Browse files
committed
feat(chat): add chat.defer(), preload toggle, TTFB measurement, and fix ChatTaskWirePayload export
1 parent fb4a699 commit 95aba16

File tree

7 files changed

+136
-26
lines changed

7 files changed

+136
-26
lines changed

docs/guides/ai-chat.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1387,6 +1387,7 @@ See [onTurnComplete](#onturncomplete) for the full field reference.
13871387
| `chat.setTurnTimeout(duration)` | Override turn timeout at runtime (e.g. `"2h"`) |
13881388
| `chat.setTurnTimeoutInSeconds(seconds)` | Override turn timeout at runtime (in seconds) |
13891389
| `chat.setWarmTimeoutInSeconds(seconds)` | Override warm timeout at runtime |
1390+
| `chat.defer(promise)` | Run background work in parallel with streaming, awaited before `onTurnComplete` |
13901391
| `chat.isStopped()` | Check if the current turn was stopped by the user (works anywhere during a turn) |
13911392
| `chat.cleanupAbortedParts(message)` | Remove incomplete parts from a stopped response message |
13921393
| `chat.stream` | Typed chat output stream — use `.writer()`, `.pipe()`, `.append()`, `.read()` |

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,8 @@ const chatStream = streams.define<UIMessageChunk>({ id: _CHAT_STREAM_KEY });
309309
/**
310310
* The wire payload shape sent by `TriggerChatTransport`.
311311
* Uses `metadata` to match the AI SDK's `ChatRequestOptions` field name.
312-
* @internal
313312
*/
314-
type ChatTaskWirePayload<TMessage extends UIMessage = UIMessage, TMetadata = unknown> = {
313+
export type ChatTaskWirePayload<TMessage extends UIMessage = UIMessage, TMetadata = unknown> = {
315314
messages: TMessage[];
316315
chatId: string;
317316
trigger: "submit-message" | "regenerate-message" | "preload";
@@ -384,6 +383,13 @@ export type ChatTaskRunPayload<TClientData = unknown> = ChatTaskPayload<TClientD
384383
const messagesInput = streams.input<ChatTaskWirePayload>({ id: CHAT_MESSAGES_STREAM_ID });
385384
const stopInput = streams.input<{ stop: true; message?: string }>({ id: CHAT_STOP_STREAM_ID });
386385

386+
/**
387+
* Per-turn deferred promises. Registered via `chat.defer()`, awaited
388+
* before `onTurnComplete` fires. Reset each turn.
389+
* @internal
390+
*/
391+
const chatDeferKey = locals.create<Set<Promise<unknown>>>("chat.defer");
392+
387393
/**
388394
* Run-scoped pipe counter. Stored in locals so concurrent runs in the
389395
* same worker don't share state.
@@ -1016,6 +1022,7 @@ function chatTask<
10161022
`chat turn ${turn + 1}`,
10171023
async () => {
10181024
locals.set(chatPipeCountKey, 0);
1025+
locals.set(chatDeferKey, new Set());
10191026

10201027
// Store chat context for auto-detection by ai.tool subtasks
10211028
locals.set(chatTurnContextKey, {
@@ -1270,6 +1277,16 @@ function chatTask<
12701277
turnAccessToken
12711278
);
12721279

1280+
// Await deferred background work (e.g. DB writes from onTurnStart)
1281+
// before firing onTurnComplete so hooks can rely on the work being done.
1282+
const deferredWork = locals.get(chatDeferKey);
1283+
if (deferredWork && deferredWork.size > 0) {
1284+
await Promise.race([
1285+
Promise.allSettled(deferredWork),
1286+
new Promise<void>((r) => setTimeout(r, 5_000)),
1287+
]);
1288+
}
1289+
12731290
// Fire onTurnComplete after response capture
12741291
if (onTurnComplete) {
12751292
await tracer.startActiveSpan(
@@ -1487,6 +1504,32 @@ function isStopped(): boolean {
14871504
return controller?.signal.aborted ?? false;
14881505
}
14891506

1507+
// ---------------------------------------------------------------------------
1508+
// Per-turn deferred work
1509+
// ---------------------------------------------------------------------------
1510+
1511+
/**
1512+
* Register a promise that runs in the background during the current turn.
1513+
*
1514+
* Use this to move non-blocking work (DB writes, analytics, etc.) out of
1515+
* the critical path. The promise runs in parallel with streaming and is
1516+
* awaited (with a 5 s timeout) before `onTurnComplete` fires.
1517+
*
1518+
* @example
1519+
* ```ts
1520+
* onTurnStart: async ({ chatId, uiMessages }) => {
1521+
* // Persist messages without blocking the LLM call
1522+
* chat.defer(db.chat.update({ where: { id: chatId }, data: { messages: uiMessages } }));
1523+
* },
1524+
* ```
1525+
*/
1526+
function chatDefer(promise: Promise<unknown>): void {
1527+
const promises = locals.get(chatDeferKey);
1528+
if (promises) {
1529+
promises.add(promise);
1530+
}
1531+
}
1532+
14901533
// ---------------------------------------------------------------------------
14911534
// Aborted message cleanup
14921535
// ---------------------------------------------------------------------------
@@ -1806,6 +1849,8 @@ export const chat = {
18061849
isStopped,
18071850
/** Clean up aborted parts from a UIMessage. See {@link cleanupAbortedParts}. */
18081851
cleanupAbortedParts,
1852+
/** Register background work that runs in parallel with streaming. See {@link chatDefer}. */
1853+
defer: chatDefer,
18091854
/** Typed chat output stream for writing custom chunks or piping from subtasks. */
18101855
stream: chatStream,
18111856
};

references/ai-chat/src/app/page.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
type ChatMeta = {
1313
id: string;
1414
title: string;
15+
model: string;
1516
createdAt: number;
1617
updatedAt: number;
1718
};

references/ai-chat/src/components/chat-app.tsx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ export function ChatApp({
5151

5252
// Model for new chats (before first message is sent)
5353
const [newChatModel, setNewChatModel] = useState(DEFAULT_MODEL);
54+
const [preloadEnabled, setPreloadEnabled] = useState(true);
5455

5556
const handleSessionChange = useCallback(
5657
(chatId: string, session: SessionInfo | null) => {
@@ -98,8 +99,10 @@ export function ChatApp({
9899
setActiveChatId(id);
99100
setMessages([]);
100101
setNewChatModel(DEFAULT_MODEL);
101-
// Eagerly start the run — onPreload fires immediately for initialization
102-
transport.preload(id);
102+
if (preloadEnabled) {
103+
// Eagerly start the run — onPreload fires immediately for initialization
104+
transport.preload(id);
105+
}
103106
}
104107

105108
function handleSelectChat(id: string) {
@@ -149,6 +152,8 @@ export function ChatApp({
149152
onSelectChat={handleSelectChat}
150153
onNewChat={handleNewChat}
151154
onDeleteChat={handleDeleteChat}
155+
preloadEnabled={preloadEnabled}
156+
onPreloadChange={setPreloadEnabled}
152157
/>
153158
<div className="flex-1">
154159
{activeChatId ? (

references/ai-chat/src/components/chat-sidebar.tsx

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type ChatSidebarProps = {
2424
onSelectChat: (id: string) => void;
2525
onNewChat: () => void;
2626
onDeleteChat: (id: string) => void;
27+
preloadEnabled: boolean;
28+
onPreloadChange: (enabled: boolean) => void;
2729
};
2830

2931
export function ChatSidebar({
@@ -32,6 +34,8 @@ export function ChatSidebar({
3234
onSelectChat,
3335
onNewChat,
3436
onDeleteChat,
37+
preloadEnabled,
38+
onPreloadChange,
3539
}: ChatSidebarProps) {
3640
const sorted = [...chats].sort((a, b) => b.updatedAt - a.updatedAt);
3741

@@ -77,6 +81,18 @@ export function ChatSidebar({
7781
</button>
7882
))}
7983
</div>
84+
85+
<div className="shrink-0 border-t border-gray-200 px-3 py-2.5">
86+
<label className="flex items-center gap-2 text-xs text-gray-500 cursor-pointer select-none">
87+
<input
88+
type="checkbox"
89+
checked={preloadEnabled}
90+
onChange={(e) => onPreloadChange(e.target.checked)}
91+
className="rounded border-gray-300"
92+
/>
93+
Preload new chats
94+
</label>
95+
</div>
8096
</div>
8197
);
8298
}

references/ai-chat/src/components/chat.tsx

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,24 @@ function ResearchProgress({ part }: { part: any }) {
110110
);
111111
}
112112

113+
type TtfbEntry = { turn: number; ttfbMs: number };
114+
113115
function DebugPanel({
114116
chatId,
115117
model,
116118
status,
117119
session,
118120
dashboardUrl,
119121
messageCount,
122+
ttfbHistory,
120123
}: {
121124
chatId: string;
122125
model: string;
123126
status: string;
124127
session?: { runId: string; publicAccessToken: string; lastEventId?: string };
125128
dashboardUrl?: string;
126129
messageCount: number;
130+
ttfbHistory: TtfbEntry[];
127131
}) {
128132
const [open, setOpen] = useState(false);
129133

@@ -132,6 +136,12 @@ function DebugPanel({
132136
? `${dashboardUrl}/runs/${session.runId}`
133137
: undefined;
134138

139+
const latestTtfb = ttfbHistory.length > 0 ? ttfbHistory[ttfbHistory.length - 1]! : undefined;
140+
const avgTtfb =
141+
ttfbHistory.length > 0
142+
? Math.round(ttfbHistory.reduce((sum, e) => sum + e.ttfbMs, 0) / ttfbHistory.length)
143+
: undefined;
144+
135145
return (
136146
<div className="shrink-0 border-t border-gray-200 bg-gray-50 text-xs text-gray-500">
137147
<button
@@ -150,6 +160,9 @@ function DebugPanel({
150160
}`}
151161
/>
152162
<span>{status}</span>
163+
{latestTtfb && (
164+
<span className="font-mono text-blue-600">TTFB {latestTtfb.ttfbMs.toLocaleString()}ms</span>
165+
)}
153166
{session?.runId && (
154167
<span className="font-mono">{session.runId.slice(0, 16)}...</span>
155168
)}
@@ -170,6 +183,22 @@ function DebugPanel({
170183
) : (
171184
<Row label="Session" value="none" />
172185
)}
186+
{ttfbHistory.length > 0 && (
187+
<>
188+
<div className="mt-2 border-t border-gray-200 pt-2">
189+
<span className="font-medium text-gray-600">TTFB</span>
190+
{avgTtfb !== undefined && (
191+
<span className="ml-2 text-gray-400">avg {avgTtfb.toLocaleString()}ms</span>
192+
)}
193+
</div>
194+
{ttfbHistory.map((entry) => (
195+
<div key={entry.turn} className="flex items-center gap-2">
196+
<span className="w-24 shrink-0 text-gray-400">Turn {entry.turn}</span>
197+
<span className="font-mono">{entry.ttfbMs.toLocaleString()}ms</span>
198+
</div>
199+
))}
200+
</>
201+
)}
173202
</div>
174203
)}
175204
</div>
@@ -236,6 +265,11 @@ export function Chat({
236265
const [input, setInput] = useState("");
237266
const hasCalledFirstMessage = useRef(false);
238267

268+
// TTFB tracking
269+
const sendTimestamp = useRef<number | null>(null);
270+
const turnCounter = useRef(0);
271+
const [ttfbHistory, setTtfbHistory] = useState<TtfbEntry[]>([]);
272+
239273
const { messages, sendMessage, stop, status, error } = useChat({
240274
id: chatId,
241275
messages: initialMessages,
@@ -257,6 +291,19 @@ export function Chat({
257291
}
258292
}, [messages, chatId, onFirstMessage]);
259293

294+
// TTFB detection: record when first assistant content appears after send
295+
useEffect(() => {
296+
if (status !== "streaming") return;
297+
if (sendTimestamp.current === null) return;
298+
const lastMsg = messages[messages.length - 1];
299+
if (lastMsg?.role === "assistant") {
300+
const ttfbMs = Date.now() - sendTimestamp.current;
301+
const turn = turnCounter.current;
302+
sendTimestamp.current = null;
303+
setTtfbHistory((prev) => [...prev, { turn, ttfbMs }]);
304+
}
305+
}, [status, messages]);
306+
260307
// Pending message to send after the current turn completes
261308
const [pendingMessage, setPendingMessage] = useState<string | null>(null);
262309

@@ -277,6 +324,8 @@ export function Chat({
277324
if (pendingMessage) {
278325
const text = pendingMessage;
279326
setPendingMessage(null);
327+
turnCounter.current++;
328+
sendTimestamp.current = Date.now();
280329
sendMessage({ text }, { metadata: { model } });
281330
}
282331
}, [status, messages, chatId, onMessagesChange, sendMessage, pendingMessage, model]);
@@ -423,6 +472,7 @@ export function Chat({
423472
session={session}
424473
dashboardUrl={dashboardUrl}
425474
messageCount={messages.length}
475+
ttfbHistory={ttfbHistory}
426476
/>
427477

428478
<form
@@ -432,6 +482,8 @@ export function Chat({
432482
if (status === "streaming") {
433483
setPendingMessage(input);
434484
} else {
485+
turnCounter.current++;
486+
sendTimestamp.current = Date.now();
435487
sendMessage({ text: input }, { metadata: { model } });
436488
}
437489
setInput("");

references/ai-chat/src/trigger/chat.ts

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ const userContext = chat.local<{
137137
}>({ id: "userContext" });
138138

139139
// Per-run dynamic tools — loaded from DB in onPreload/onChatStart
140-
const userToolDefs = chat.local<
141-
Array<{ name: string; description: string; responseTemplate: string }>
142-
>({ id: "userToolDefs" });
140+
const userToolDefs = chat.local<{
141+
value: Array<{ name: string; description: string; responseTemplate: string }>;
142+
}>({ id: "userToolDefs" });
143143

144144
// --------------------------------------------------------------------------
145145
// Subtask: deep research — fetches multiple URLs and streams progress
@@ -250,6 +250,7 @@ export const aiChat = chat.task({
250250
warmTimeoutInSeconds: 60,
251251
chatAccessTokenTTL: "2h",
252252
onPreload: async ({ chatId, runId, chatAccessToken, clientData }) => {
253+
if (!clientData) return;
253254
// Eagerly initialize before the user's first message arrives
254255
const user = await prisma.user.upsert({
255256
where: { id: clientData.userId },
@@ -266,7 +267,7 @@ export const aiChat = chat.task({
266267

267268
// Load user-specific dynamic tools
268269
const tools = await prisma.userTool.findMany({ where: { userId: clientData.userId } });
269-
userToolDefs.init(tools);
270+
userToolDefs.init({ value: tools });
270271

271272
// Create chat record and session
272273
await prisma.chat.upsert({
@@ -287,12 +288,8 @@ export const aiChat = chat.task({
287288
},
288289
onChatStart: async ({ chatId, runId, chatAccessToken, clientData, continuation, preloaded }) => {
289290
if (preloaded) {
290-
// Already initialized in onPreload — just update session
291-
await prisma.chatSession.upsert({
292-
where: { id: chatId },
293-
create: { id: chatId, runId, publicAccessToken: chatAccessToken },
294-
update: { runId, publicAccessToken: chatAccessToken },
295-
});
291+
// Everything was already initialized in onPreload — skip entirely.
292+
// The session, chat record, user context, and tools are all set up.
296293
return;
297294
}
298295

@@ -312,7 +309,7 @@ export const aiChat = chat.task({
312309

313310
// Load user-specific dynamic tools
314311
const tools = await prisma.userTool.findMany({ where: { userId: clientData.userId } });
315-
userToolDefs.init(tools);
312+
userToolDefs.init({ value: tools });
316313

317314
if (!continuation) {
318315
await prisma.chat.upsert({
@@ -333,17 +330,10 @@ export const aiChat = chat.task({
333330
update: { runId, publicAccessToken: chatAccessToken },
334331
});
335332
},
336-
onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => {
337-
// Persist messages BEFORE streaming so mid-stream refresh has the user message
338-
await prisma.chat.update({
339-
where: { id: chatId },
340-
data: { messages: uiMessages as any },
341-
});
342-
await prisma.chatSession.upsert({
343-
where: { id: chatId },
344-
create: { id: chatId, runId, publicAccessToken: chatAccessToken },
345-
update: { runId, publicAccessToken: chatAccessToken },
346-
});
333+
onTurnStart: async ({ chatId, uiMessages }) => {
334+
// Persist messages so mid-stream refresh still shows the user message.
335+
// Deferred — runs in parallel with streaming, awaited before onTurnComplete.
336+
chat.defer(prisma.chat.update({ where: { id: chatId }, data: { messages: uiMessages as any } }));
347337
},
348338
onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId, clientData, stopped }) => {
349339
// Persist final messages + assistant response + stream position

0 commit comments

Comments
 (0)