Skip to content

Commit d6b9c4a

Browse files
committed
Minor fixes around reconnecting streams
1 parent a214660 commit d6b9c4a

File tree

2 files changed

+88
-22
lines changed

2 files changed

+88
-22
lines changed

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
210210
| undefined;
211211

212212
private sessions: Map<string, ChatSessionState> = new Map();
213-
private activeReconnects: Map<string, AbortController> = new Map();
213+
private activeStreams: Map<string, AbortController> = new Map();
214214

215215
constructor(options: TriggerChatTransportOptions) {
216216
this.taskId = options.task;
@@ -277,6 +277,15 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
277277

278278
const apiClient = new ApiClient(this.baseURL, session.publicAccessToken);
279279
await apiClient.sendInputStream(session.runId, CHAT_MESSAGES_STREAM_ID, minimalPayload);
280+
281+
// Cancel any active reconnect stream for this chatId before
282+
// opening a new subscription for the new turn.
283+
const activeStream = this.activeStreams.get(chatId);
284+
if (activeStream) {
285+
activeStream.abort();
286+
this.activeStreams.delete(chatId);
287+
}
288+
280289
return this.subscribeToStream(
281290
session.runId,
282291
session.publicAccessToken,
@@ -331,20 +340,21 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
331340
return null;
332341
}
333342

334-
// Abort any previous reconnect for this chatId (e.g. React strict mode
335-
// double-firing the effect) to avoid duplicate SSE connections.
336-
const prev = this.activeReconnects.get(options.chatId);
337-
if (prev) {
338-
prev.abort();
343+
// Deduplicate: if there's already an active stream for this chatId,
344+
// return null so the second caller no-ops.
345+
if (this.activeStreams.has(options.chatId)) {
346+
return null;
339347
}
340-
const reconnectAbort = new AbortController();
341-
this.activeReconnects.set(options.chatId, reconnectAbort);
348+
349+
const abortController = new AbortController();
350+
this.activeStreams.set(options.chatId, abortController);
342351

343352
return this.subscribeToStream(
344353
session.runId,
345354
session.publicAccessToken,
346-
reconnectAbort.signal,
347-
options.chatId
355+
abortController.signal,
356+
options.chatId,
357+
{ sendStopOnAbort: false }
348358
);
349359
};
350360

@@ -408,7 +418,8 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
408418
runId: string,
409419
accessToken: string,
410420
abortSignal: AbortSignal | undefined,
411-
chatId?: string
421+
chatId?: string,
422+
options?: { sendStopOnAbort?: boolean }
412423
): ReadableStream<UIMessageChunk> {
413424
const headers: Record<string, string> = {
414425
Authorization: `Bearer ${accessToken}`,
@@ -427,13 +438,14 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
427438
? AbortSignal.any([abortSignal, internalAbort.signal])
428439
: internalAbort.signal;
429440

430-
// When the caller aborts (user calls stop()), send a stop signal to the
431-
// running task via input streams, then close the SSE connection.
441+
// When the caller aborts (user calls stop()), close the SSE connection.
442+
// Only send a stop signal to the task if this is a user-initiated stop
443+
// (sendStopOnAbort), not an internal stream management abort.
432444
if (abortSignal) {
433445
abortSignal.addEventListener(
434446
"abort",
435447
() => {
436-
if (session) {
448+
if (options?.sendStopOnAbort !== false && session) {
437449
session.skipToTurnComplete = true;
438450
const api = new ApiClient(this.baseURL, session.publicAccessToken);
439451
api
@@ -468,14 +480,6 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
468480
const { done, value } = await reader.read();
469481

470482
if (done) {
471-
// Only delete session if the stream ended naturally (not aborted by stop).
472-
// When the user clicks stop, the abort closes the SSE reader which
473-
// returns done=true, but the run is still alive and waiting for
474-
// the next message via input streams.
475-
if (chatId && !combinedSignal.aborted) {
476-
this.sessions.delete(chatId);
477-
this.notifySessionChange(chatId, null);
478-
}
479483
controller.close();
480484
return;
481485
}

references/ai-chat/README.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# AI Chat Reference App
2+
3+
A multi-turn chat app built with the AI SDK's `useChat` hook and Trigger.dev's `chat.task`. Conversations run as durable Trigger.dev tasks with realtime streaming, automatic message accumulation, and persistence across page refreshes.
4+
5+
## Data Models
6+
7+
### Chat
8+
9+
The conversation itself — your application data.
10+
11+
| Column | Description |
12+
| ---------- | ---------------------------------------- |
13+
| `id` | Unique chat ID (generated on the client) |
14+
| `title` | Display title for the sidebar |
15+
| `messages` | Full `UIMessage[]` history (JSON) |
16+
17+
A Chat lives forever (until the user deletes it). It is independent of any particular Trigger.dev run.
18+
19+
### ChatSession
20+
21+
The transport's connection state for a chat — what the frontend needs to reconnect to the same Trigger.dev run after a page refresh.
22+
23+
| Column | Description |
24+
| ------------------- | --------------------------------------------------------------------------- |
25+
| `id` | Same as the chat ID (1:1 relationship) |
26+
| `runId` | The Trigger.dev run handling this conversation |
27+
| `publicAccessToken` | Scoped token for reading the run's stream and sending input stream messages |
28+
| `lastEventId` | Stream position — used to resume without replaying old events |
29+
30+
A Chat can outlive many ChatSessions. When the run ends (turn timeout, max turns reached, crash), the ChatSession is gone but the Chat and its messages remain. The next message from the user starts a fresh run and creates a new ChatSession for the same Chat.
31+
32+
**Think of it as: Chat = the conversation, ChatSession = the live connection to the run handling it.**
33+
34+
## Lifecycle Hooks
35+
36+
Persistence is handled server-side in the Trigger.dev task via three hooks:
37+
38+
- **`onChatStart`** — Creates the Chat and ChatSession records when a new conversation starts (turn 0).
39+
- **`onTurnStart`** — Saves messages and updates the session _before_ streaming begins, so a mid-stream page refresh still shows the user's message.
40+
- **`onTurnComplete`** — Saves the assistant's response and the `lastEventId` for stream resumption.
41+
42+
## Setup
43+
44+
```bash
45+
# From the repo root
46+
pnpm run docker # Start PostgreSQL, Redis, Electric
47+
pnpm run db:migrate # Run webapp migrations
48+
pnpm run db:seed # Seed the database
49+
50+
# Set up the reference app's database
51+
cd references/ai-chat
52+
cp .env.example .env # Edit DATABASE_URL if needed
53+
npx prisma migrate deploy
54+
55+
# Build and run
56+
pnpm run build --filter trigger.dev --filter @trigger.dev/sdk
57+
pnpm run dev --filter webapp # In one terminal
58+
cd references/ai-chat && pnpm exec trigger dev # In another
59+
cd references/ai-chat && pnpm run dev # In another
60+
```
61+
62+
Open http://localhost:3000 to use the chat app.

0 commit comments

Comments
 (0)