Skip to content

Commit cfe56ab

Browse files
committed
feat: add onTurnStart hook, lastEventId support, and stream resume deduplication
1 parent 5ee5959 commit cfe56ab

File tree

14 files changed

+342
-104
lines changed

14 files changed

+342
-104
lines changed

docs/guides/ai-chat.mdx

Lines changed: 128 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ The backend automatically accumulates the full conversation history across turns
147147

148148
The accumulated messages are available in:
149149
- `run()` as `messages` (`ModelMessage[]`) — for passing to `streamText`
150-
- `onTurnComplete()` as `uiMessages` (`UIMessage[]`) — for persistence
150+
- `onTurnStart()` as `uiMessages` (`UIMessage[]`) — for persisting before streaming
151+
- `onTurnComplete()` as `uiMessages` (`UIMessage[]`) — for persisting after the response
151152

152153
## Backend patterns
153154

@@ -258,9 +259,46 @@ export const myChat = chat.task({
258259
`clientData` contains custom data from the frontend — either the `metadata` option on the transport constructor (sent with every message) or the `metadata` option on `sendMessage()` (per-message). See [Client data and metadata](#client-data-and-metadata).
259260
</Tip>
260261

262+
### onTurnStart
263+
264+
Fires at the start of every turn, after message accumulation and `onChatStart` (turn 0), but **before** `run()` executes. Use it to persist messages before streaming begins — so a mid-stream page refresh still shows the user's message.
265+
266+
| Field | Type | Description |
267+
|-------|------|-------------|
268+
| `chatId` | `string` | Chat session ID |
269+
| `messages` | `ModelMessage[]` | Full accumulated conversation (model format) |
270+
| `uiMessages` | `UIMessage[]` | Full accumulated conversation (UI format) |
271+
| `turn` | `number` | Turn number (0-indexed) |
272+
| `runId` | `string` | The Trigger.dev run ID |
273+
| `chatAccessToken` | `string` | Scoped access token for this run |
274+
275+
```ts
276+
export const myChat = chat.task({
277+
id: "my-chat",
278+
onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => {
279+
await db.chat.update({
280+
where: { id: chatId },
281+
data: { messages: uiMessages },
282+
});
283+
await db.chatSession.upsert({
284+
where: { id: chatId },
285+
create: { id: chatId, runId, publicAccessToken: chatAccessToken },
286+
update: { runId, publicAccessToken: chatAccessToken },
287+
});
288+
},
289+
run: async ({ messages, signal }) => {
290+
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
291+
},
292+
});
293+
```
294+
295+
<Tip>
296+
By persisting in `onTurnStart`, the user's message is saved to your database before the AI starts streaming. If the user refreshes mid-stream, the message is already there.
297+
</Tip>
298+
261299
### onTurnComplete
262300

263-
Fires after each turn completes — after the response is captured, before waiting for the next message. This is the primary hook for persisting conversations.
301+
Fires after each turn completes — after the response is captured, before waiting for the next message. This is the primary hook for persisting the assistant's response.
264302

265303
| Field | Type | Description |
266304
|-------|------|-------------|
@@ -271,15 +309,23 @@ Fires after each turn completes — after the response is captured, before waiti
271309
| `newUIMessages` | `UIMessage[]` | Only this turn's messages (UI format) |
272310
| `responseMessage` | `UIMessage \| undefined` | The assistant's response for this turn |
273311
| `turn` | `number` | Turn number (0-indexed) |
312+
| `runId` | `string` | The Trigger.dev run ID |
313+
| `chatAccessToken` | `string` | Scoped access token for this run |
314+
| `lastEventId` | `string \| undefined` | Stream position for resumption. Persist this with the session. |
274315

275316
```ts
276317
export const myChat = chat.task({
277318
id: "my-chat",
278-
onTurnComplete: async ({ chatId, uiMessages }) => {
319+
onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => {
279320
await db.chat.update({
280321
where: { id: chatId },
281322
data: { messages: uiMessages },
282323
});
324+
await db.chatSession.upsert({
325+
where: { id: chatId },
326+
create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId },
327+
update: { runId, publicAccessToken: chatAccessToken, lastEventId },
328+
});
283329
},
284330
run: async ({ messages, signal }) => {
285331
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
@@ -291,22 +337,26 @@ export const myChat = chat.task({
291337
Use `uiMessages` to overwrite the full conversation each turn (simplest). Use `newUIMessages` if you prefer to store messages individually — for example, one database row per message.
292338
</Tip>
293339

340+
<Tip>
341+
Persist `lastEventId` alongside the session. When the transport reconnects after a page refresh, it uses this to skip past already-seen events — preventing duplicate messages.
342+
</Tip>
343+
294344
## Persistence
295345

296346
### What needs to be persisted
297347

298348
To build a chat app that survives page refreshes, you need to persist two things:
299349

300-
1. **Messages** — The conversation history. Persisted **server-side** in the task via `onTurnComplete`.
301-
2. **Sessions** — The transport's connection state (`runId`, `publicAccessToken`, `lastEventId`). Persisted **client-side** via `onSessionChange`.
350+
1. **Messages** — The conversation history. Persisted **server-side** in the task via `onTurnStart` and `onTurnComplete`.
351+
2. **Sessions** — The transport's connection state (`runId`, `publicAccessToken`, `lastEventId`). Persisted **server-side** via `onTurnStart` and `onTurnComplete`.
302352

303353
<Note>
304354
Sessions let the transport reconnect to an existing run after a page refresh. Without them, every page load would start a new run — losing the conversation context that was accumulated in the previous run.
305355
</Note>
306356

307-
### Persisting messages (server-side)
357+
### Persisting messages and sessions (server-side)
308358

309-
Messages are stored inside the task itself, so they're durable even if the frontend disconnects mid-conversation.
359+
Both messages and sessions are persisted server-side in the lifecycle hooks. `onTurnStart` saves the user's message before streaming begins, while `onTurnComplete` saves the assistant's response and the `lastEventId` for stream resumption.
310360

311361
```ts trigger/chat.ts
312362
import { chat } from "@trigger.dev/sdk/ai";
@@ -316,16 +366,34 @@ import { db } from "@/lib/db";
316366

317367
export const myChat = chat.task({
318368
id: "my-chat",
319-
onChatStart: async ({ chatId, clientData }) => {
369+
onChatStart: async ({ chatId }) => {
320370
await db.chat.create({
321371
data: { id: chatId, title: "New chat", messages: [] },
322372
});
323373
},
324-
onTurnComplete: async ({ chatId, uiMessages }) => {
374+
onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => {
375+
// Save user message + session before streaming starts
325376
await db.chat.update({
326377
where: { id: chatId },
327378
data: { messages: uiMessages },
328379
});
380+
await db.chatSession.upsert({
381+
where: { id: chatId },
382+
create: { id: chatId, runId, publicAccessToken: chatAccessToken },
383+
update: { runId, publicAccessToken: chatAccessToken },
384+
});
385+
},
386+
onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => {
387+
// Save assistant response + stream position after turn completes
388+
await db.chat.update({
389+
where: { id: chatId },
390+
data: { messages: uiMessages },
391+
});
392+
await db.chatSession.upsert({
393+
where: { id: chatId },
394+
create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId },
395+
update: { runId, publicAccessToken: chatAccessToken, lastEventId },
396+
});
329397
},
330398
run: async ({ messages, signal }) => {
331399
return streamText({
@@ -337,40 +405,34 @@ export const myChat = chat.task({
337405
});
338406
```
339407

340-
### Persisting sessions (frontend)
341-
342-
The `onSessionChange` callback on the transport fires whenever a session's state changes:
408+
### Session cleanup (frontend)
343409

344-
- **Session created** — After triggering a new task run
345-
- **Turn completed** — The `lastEventId` is updated (used for stream resumption)
346-
- **Session removed** — The run ended or failed. `session` is `null`.
410+
Since session creation and updates are handled server-side, the frontend only needs to handle session deletion when a run ends:
347411

348412
```tsx
349413
const transport = useTriggerChatTransport<typeof myChat>({
350414
task: "my-chat",
351415
accessToken: getChatToken,
352416
sessions: loadedSessions, // Restored from DB on page load
353417
onSessionChange: (chatId, session) => {
354-
if (session) {
355-
saveSession(chatId, session); // Server action
356-
} else {
357-
deleteSession(chatId); // Server action
418+
if (!session) {
419+
deleteSession(chatId); // Server action — run ended
358420
}
359421
},
360422
});
361423
```
362424

363425
### Restoring on page load
364426

365-
On page load, fetch both the messages and the session from your database, then pass them to `useChat` and the transport:
427+
On page load, fetch both the messages and the session from your database, then pass them to `useChat` and the transport. Pass `resume: true` to `useChat` when there's an existing conversation — this tells the AI SDK to reconnect to the stream via the transport.
366428

367429
```tsx app/page.tsx
368430
"use client";
369431

370432
import { useEffect, useState } from "react";
371433
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";
372434
import { useChat } from "@ai-sdk/react";
373-
import { getChatToken, getChatMessages, getSession } from "@/app/actions";
435+
import { getChatToken, getChatMessages, getSession, deleteSession } from "@/app/actions";
374436

375437
export default function ChatPage({ chatId }: { chatId: string }) {
376438
const [initialMessages, setInitialMessages] = useState([]);
@@ -407,24 +469,28 @@ function ChatClient({ chatId, initialMessages, initialSessions }) {
407469
accessToken: getChatToken,
408470
sessions: initialSessions,
409471
onSessionChange: (id, session) => {
410-
if (session) saveSession(id, session);
411-
else deleteSession(id);
472+
if (!session) deleteSession(id);
412473
},
413474
});
414475

415476
const { messages, sendMessage, stop, status } = useChat({
416477
id: chatId,
417478
messages: initialMessages,
418479
transport,
480+
resume: initialMessages.length > 0, // Resume if there's an existing conversation
419481
});
420482

421483
// ... render UI
422484
}
423485
```
424486

487+
<Info>
488+
`resume: true` causes `useChat` to call `reconnectToStream` on the transport when the component mounts. The transport uses the session's `lastEventId` to skip past already-seen stream events, so the frontend only receives new data. Only enable `resume` when there are existing messages — for brand new chats, there's nothing to reconnect to.
489+
</Info>
490+
425491
### Full example
426492

427-
Putting it all together — a complete chat app with server-side message persistence and session reconnection:
493+
Putting it all together — a complete chat app with server-side persistence, session reconnection, and stream resumption:
428494

429495
<CodeGroup>
430496
```ts trigger/chat.ts
@@ -440,11 +506,29 @@ export const myChat = chat.task({
440506
data: { id: chatId, title: "New chat", messages: [] },
441507
});
442508
},
443-
onTurnComplete: async ({ chatId, uiMessages }) => {
509+
onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => {
510+
// Persist messages + session before streaming
444511
await db.chat.update({
445512
where: { id: chatId },
446513
data: { messages: uiMessages },
447514
});
515+
await db.chatSession.upsert({
516+
where: { id: chatId },
517+
create: { id: chatId, runId, publicAccessToken: chatAccessToken },
518+
update: { runId, publicAccessToken: chatAccessToken },
519+
});
520+
},
521+
onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => {
522+
// Persist assistant response + stream position
523+
await db.chat.update({
524+
where: { id: chatId },
525+
data: { messages: uiMessages },
526+
});
527+
await db.chatSession.upsert({
528+
where: { id: chatId },
529+
create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId },
530+
update: { runId, publicAccessToken: chatAccessToken, lastEventId },
531+
});
448532
},
449533
run: async ({ messages, signal }) => {
450534
return streamText({
@@ -471,19 +555,21 @@ export async function getChatMessages(chatId: string) {
471555
return found?.messages ?? [];
472556
}
473557

474-
export async function getSession(chatId: string) {
475-
return await db.chatSession.findUnique({ where: { id: chatId } });
476-
}
477-
478-
export async function saveSession(
479-
chatId: string,
480-
session: { runId: string; publicAccessToken: string; lastEventId?: string }
481-
) {
482-
await db.chatSession.upsert({
483-
where: { id: chatId },
484-
create: { id: chatId, ...session },
485-
update: session,
486-
});
558+
export async function getAllSessions() {
559+
const sessions = await db.chatSession.findMany();
560+
const result: Record<string, {
561+
runId: string;
562+
publicAccessToken: string;
563+
lastEventId?: string;
564+
}> = {};
565+
for (const s of sessions) {
566+
result[s.id] = {
567+
runId: s.runId,
568+
publicAccessToken: s.publicAccessToken,
569+
lastEventId: s.lastEventId ?? undefined,
570+
};
571+
}
572+
return result;
487573
}
488574

489575
export async function deleteSession(chatId: string) {
@@ -497,27 +583,23 @@ export async function deleteSession(chatId: string) {
497583
import { useChat } from "@ai-sdk/react";
498584
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";
499585
import type { myChat } from "@/trigger/chat";
500-
import {
501-
getChatToken,
502-
saveSession,
503-
deleteSession,
504-
} from "@/app/actions";
586+
import { getChatToken, deleteSession } from "@/app/actions";
505587

506588
export function Chat({ chatId, initialMessages, initialSessions }) {
507589
const transport = useTriggerChatTransport<typeof myChat>({
508590
task: "my-chat",
509591
accessToken: getChatToken,
510592
sessions: initialSessions,
511593
onSessionChange: (id, session) => {
512-
if (session) saveSession(id, session);
513-
else deleteSession(id);
594+
if (!session) deleteSession(id);
514595
},
515596
});
516597

517598
const { messages, sendMessage, stop, status } = useChat({
518599
id: chatId,
519600
messages: initialMessages,
520601
transport,
602+
resume: initialMessages.length > 0,
521603
});
522604

523605
return (
@@ -726,6 +808,7 @@ const transport = useTriggerChatTransport({
726808
| `id` | `string` | required | Task identifier |
727809
| `run` | `(payload: ChatTaskRunPayload) => Promise<unknown>` | required | Handler for each turn |
728810
| `onChatStart` | `(event: ChatStartEvent) => Promise<void> \| void` || Fires on turn 0 before `run()` |
811+
| `onTurnStart` | `(event: TurnStartEvent) => Promise<void> \| void` || Fires every turn before `run()` |
729812
| `onTurnComplete` | `(event: TurnCompleteEvent) => Promise<void> \| void` || Fires after each turn completes |
730813
| `maxTurns` | `number` | `100` | Max conversational turns per run |
731814
| `turnTimeout` | `string` | `"1h"` | How long to wait for next message |

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
RealtimeStreamInstance,
77
RealtimeStreamOperationOptions,
88
RealtimeStreamsManager,
9+
StreamWriteResult,
910
} from "./types.js";
1011

1112
export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
@@ -16,7 +17,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
1617
) {}
1718
// Track active streams - using a Set allows multiple streams for the same key to coexist
1819
private activeStreams = new Set<{
19-
wait: () => Promise<void>;
20+
wait: () => Promise<StreamWriteResult>;
2021
abortController: AbortController;
2122
}>();
2223

packages/core/src/v3/realtimeStreams/noopManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export class NoopRealtimeStreamsManager implements RealtimeStreamsManager {
1515
options?: RealtimeStreamOperationOptions
1616
): RealtimeStreamInstance<T> {
1717
return {
18-
wait: () => Promise.resolve(),
18+
wait: () => Promise.resolve({}),
1919
get stream(): AsyncIterableStream<T> {
2020
return createAsyncIterableStreamFromAsyncIterable(source);
2121
},

packages/core/src/v3/realtimeStreams/streamInstance.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
33
import { AnyZodFetchOptions } from "../zodfetch.js";
44
import { StreamsWriterV1 } from "./streamsWriterV1.js";
55
import { StreamsWriterV2 } from "./streamsWriterV2.js";
6-
import { StreamsWriter } from "./types.js";
6+
import { StreamsWriter, StreamWriteResult } from "./types.js";
77

88
export type StreamInstanceOptions<T> = {
99
apiClient: ApiClient;
@@ -63,8 +63,9 @@ export class StreamInstance<T> implements StreamsWriter {
6363
return streamWriter;
6464
}
6565

66-
public async wait(): Promise<void> {
67-
return this.streamPromise.then((writer) => writer.wait());
66+
public async wait(): Promise<StreamWriteResult> {
67+
const writer = await this.streamPromise;
68+
return writer.wait();
6869
}
6970

7071
public get stream(): AsyncIterableStream<T> {

0 commit comments

Comments
 (0)