Skip to content

Commit 2cb1703

Browse files
committed
feat(chat): expose typed chat.stream, add deepResearch subtask example, per-chat model persistence, debug panel
- Export chat.stream (typed RealtimeDefinedStream<UIMessageChunk>) for writing custom data to the chat stream - Add deepResearch subtask using data-* chunks to stream progress back to parent chat via target: root - Use AI SDK data-research-progress chunk protocol with id-based updates for live progress - Add ResearchProgress component and generic data-* fallback renderer in frontend - Persist model per chat in DB (schema + onChatStart), model selector only on new chats - Add collapsible debug panel showing run ID (with dashboard link), chat ID, model, status, session info - Document chat.stream API, data-* chunks, and subtask streaming pattern in docs
1 parent a9322d7 commit 2cb1703

File tree

4 files changed

+323
-4
lines changed

4 files changed

+323
-4
lines changed

docs/guides/ai-chat.mdx

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,140 @@ This removes tool invocation parts stuck in `partial-call` state and marks any `
750750
Stop signal delivery is best-effort. There is a small race window where the model may finish before the stop signal arrives, in which case the turn completes normally with `stopped: false`. This is expected and does not require special handling.
751751
</Note>
752752

753+
## Writing to the chat stream
754+
755+
### Custom chunks with `chat.stream`
756+
757+
`chat.stream` is a typed stream bound to the chat output. Use it to write custom `UIMessageChunk` data alongside the AI-generated response — for example, status updates or progress indicators.
758+
759+
```ts
760+
import { chat } from "@trigger.dev/sdk/ai";
761+
762+
export const myChat = chat.task({
763+
id: "my-chat",
764+
run: async ({ messages, signal }) => {
765+
// Write a custom data part to the chat stream.
766+
// The AI SDK's data-* chunk protocol adds this to message.parts
767+
// on the frontend, where you can render it however you like.
768+
const { waitUntilComplete } = chat.stream.writer({
769+
execute: ({ write }) => {
770+
write({
771+
type: "data-status",
772+
id: "search-progress",
773+
data: { message: "Searching the web...", progress: 0.5 },
774+
});
775+
},
776+
});
777+
await waitUntilComplete();
778+
779+
// Then stream the AI response
780+
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
781+
},
782+
});
783+
```
784+
785+
<Tip>
786+
Use `data-*` chunk types (e.g. `data-status`, `data-progress`) for custom data. The AI SDK processes these into `DataUIPart` objects in `message.parts` on the frontend. Writing the same `type` + `id` again updates the existing part instead of creating a new one — useful for live progress.
787+
</Tip>
788+
789+
`chat.stream` exposes the full stream API:
790+
791+
| Method | Description |
792+
|--------|-------------|
793+
| `chat.stream.writer(options)` | Write individual chunks via a callback |
794+
| `chat.stream.pipe(stream, options?)` | Pipe a `ReadableStream` or `AsyncIterable` |
795+
| `chat.stream.append(value, options?)` | Append raw data |
796+
| `chat.stream.read(runId, options?)` | Read the stream by run ID |
797+
798+
### Streaming from subtasks
799+
800+
When a tool invokes a subtask via `triggerAndWait`, the subtask can stream directly to the parent chat using `target: "root"`:
801+
802+
```ts
803+
import { chat, ai } from "@trigger.dev/sdk/ai";
804+
import { schemaTask } from "@trigger.dev/sdk";
805+
import { streamText, generateId } from "ai";
806+
import { z } from "zod";
807+
808+
// A subtask that streams progress back to the parent chat
809+
export const researchTask = schemaTask({
810+
id: "research",
811+
schema: z.object({ query: z.string() }),
812+
run: async ({ query }) => {
813+
const partId = generateId();
814+
815+
// Write a data-* chunk to the root run's chat stream.
816+
// The frontend receives this as a DataUIPart in message.parts.
817+
const { waitUntilComplete } = chat.stream.writer({
818+
target: "root",
819+
execute: ({ write }) => {
820+
write({
821+
type: "data-research-status",
822+
id: partId,
823+
data: { query, status: "in-progress" },
824+
});
825+
},
826+
});
827+
await waitUntilComplete();
828+
829+
// Do the work...
830+
const result = await doResearch(query);
831+
832+
// Update the same part with the final status
833+
const { waitUntilComplete: waitDone } = chat.stream.writer({
834+
target: "root",
835+
execute: ({ write }) => {
836+
write({
837+
type: "data-research-status",
838+
id: partId,
839+
data: { query, status: "done", resultCount: result.length },
840+
});
841+
},
842+
});
843+
await waitDone();
844+
845+
return result;
846+
},
847+
});
848+
849+
// The chat task uses it as a tool via ai.tool()
850+
export const myChat = chat.task({
851+
id: "my-chat",
852+
run: async ({ messages, signal }) => {
853+
return streamText({
854+
model: openai("gpt-4o"),
855+
messages,
856+
abortSignal: signal,
857+
tools: {
858+
research: ai.tool(researchTask),
859+
},
860+
});
861+
},
862+
});
863+
```
864+
865+
On the frontend, render the custom data part:
866+
867+
```tsx
868+
{message.parts.map((part, i) => {
869+
if (part.type === "data-research-status") {
870+
const { query, status, resultCount } = part.data;
871+
return (
872+
<div key={i}>
873+
{status === "done" ? `Found ${resultCount} results` : `Researching "${query}"...`}
874+
</div>
875+
);
876+
}
877+
// ...other part types
878+
})}
879+
```
880+
881+
The `target` option accepts:
882+
- `"self"` — current run (default)
883+
- `"parent"` — parent task's run
884+
- `"root"` — root task's run (the chat task)
885+
- A specific run ID string
886+
753887
## Client data and metadata
754888

755889
### Transport-level client data
@@ -1075,6 +1209,7 @@ See [onTurnComplete](#onturncomplete) for the full field reference.
10751209
| `chat.setWarmTimeoutInSeconds(seconds)` | Override warm timeout at runtime |
10761210
| `chat.isStopped()` | Check if the current turn was stopped by the user (works anywhere during a turn) |
10771211
| `chat.cleanupAbortedParts(message)` | Remove incomplete parts from a stopped response message |
1212+
| `chat.stream` | Typed chat output stream — use `.writer()`, `.pipe()`, `.append()`, `.read()` |
10781213

10791214
## Self-hosting
10801215

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
type TaskSchema,
1515
type TaskWithSchema,
1616
} from "@trigger.dev/core/v3";
17-
import type { ModelMessage, UIMessage } from "ai";
17+
import type { ModelMessage, UIMessage, UIMessageChunk } from "ai";
1818
import type { StreamWriteResult } from "@trigger.dev/core/v3";
1919
import { convertToModelMessages, dynamicTool, generateId as generateMessageId, jsonSchema, JSONSchema7, Schema, Tool, ToolCallOptions, zodSchema } from "ai";
2020
import { type Attributes, trace } from "@opentelemetry/api";
@@ -175,6 +175,29 @@ export const CHAT_STREAM_KEY = _CHAT_STREAM_KEY;
175175
// Re-export input stream IDs for advanced usage
176176
export { CHAT_MESSAGES_STREAM_ID, CHAT_STOP_STREAM_ID };
177177

178+
/**
179+
* Typed chat output stream. Provides `.writer()`, `.pipe()`, `.append()`,
180+
* and `.read()` methods pre-bound to the chat stream key and typed to `UIMessageChunk`.
181+
*
182+
* Use from within a `chat.task` run to write custom chunks:
183+
* ```ts
184+
* const { waitUntilComplete } = chat.stream.writer({
185+
* execute: ({ write }) => {
186+
* write({ type: "text-start", id: "status-1" });
187+
* write({ type: "text-delta", id: "status-1", delta: "Processing..." });
188+
* write({ type: "text-end", id: "status-1" });
189+
* },
190+
* });
191+
* await waitUntilComplete();
192+
* ```
193+
*
194+
* Use from a subtask to stream back to the parent chat:
195+
* ```ts
196+
* chat.stream.pipe(myStream, { target: "root" });
197+
* ```
198+
*/
199+
const chatStream = streams.define<UIMessageChunk>({ id: _CHAT_STREAM_KEY });
200+
178201
/**
179202
* The wire payload shape sent by `TriggerChatTransport`.
180203
* Uses `metadata` to match the AI SDK's `ChatRequestOptions` field name.
@@ -1452,6 +1475,8 @@ export const chat = {
14521475
isStopped,
14531476
/** Clean up aborted parts from a UIMessage. See {@link cleanupAbortedParts}. */
14541477
cleanupAbortedParts,
1478+
/** Typed chat output stream for writing custom chunks or piping from subtasks. */
1479+
stream: chatStream,
14551480
};
14561481

14571482
/**

references/ai-chat/src/components/chat.tsx

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,46 @@ function ToolInvocation({ part }: { part: any }) {
7070
);
7171
}
7272

73+
function ResearchProgress({ part }: { part: any }) {
74+
const data = part.data as {
75+
status: "fetching" | "done";
76+
query: string;
77+
current: number;
78+
total: number;
79+
currentUrl?: string;
80+
completedUrls: string[];
81+
};
82+
83+
const isDone = data.status === "done";
84+
85+
return (
86+
<div className="my-2 rounded border border-blue-200 bg-blue-50 px-3 py-2 text-xs">
87+
<div className="flex items-center gap-2 font-medium text-blue-700">
88+
{isDone ? (
89+
<span className="text-green-600">&#10003;</span>
90+
) : (
91+
<span className="inline-block h-3 w-3 animate-spin rounded-full border-2 border-blue-300 border-t-blue-600" />
92+
)}
93+
<span>
94+
{isDone
95+
? `Research complete — ${data.total} sources fetched`
96+
: `Researching "${data.query}" (${data.current}/${data.total})`}
97+
</span>
98+
</div>
99+
{data.currentUrl && !isDone && (
100+
<div className="mt-1 truncate text-blue-500">Fetching {data.currentUrl}</div>
101+
)}
102+
{data.completedUrls.length > 0 && (
103+
<div className="mt-1 space-y-0.5 text-blue-400">
104+
{data.completedUrls.map((url, i) => (
105+
<div key={i} className="truncate">&#10003; {url}</div>
106+
))}
107+
</div>
108+
)}
109+
</div>
110+
);
111+
}
112+
73113
function DebugPanel({
74114
chatId,
75115
model,
@@ -321,10 +361,25 @@ export function Chat({
321361
);
322362
}
323363

364+
if (part.type === "data-research-progress") {
365+
return <ResearchProgress key={i} part={part} />;
366+
}
367+
324368
if (part.type.startsWith("tool-") || part.type === "dynamic-tool") {
325369
return <ToolInvocation key={i} part={part} />;
326370
}
327371

372+
if (part.type.startsWith("data-")) {
373+
return (
374+
<div key={i} className="my-1 rounded border border-gray-200 bg-gray-50 p-2 text-xs text-gray-500">
375+
<span className="font-medium">{part.type}</span>
376+
<pre className="mt-1 overflow-x-auto whitespace-pre-wrap">
377+
{JSON.stringify((part as any).data, null, 2)}
378+
</pre>
379+
</div>
380+
);
381+
}
382+
328383
return null;
329384
})}
330385
</div>

references/ai-chat/src/trigger/chat.ts

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { chat } from "@trigger.dev/sdk/ai";
2-
import { streamText, tool, stepCountIs } from "ai";
1+
import { chat, ai } from "@trigger.dev/sdk/ai";
2+
import { schemaTask } from "@trigger.dev/sdk";
3+
import { streamText, tool, stepCountIs, generateId } from "ai";
34
import type { LanguageModel } from "ai";
45
import { openai } from "@ai-sdk/openai";
56
import { anthropic } from "@ai-sdk/anthropic";
@@ -135,6 +136,105 @@ const userContext = chat.local<{
135136
messageCount: number;
136137
}>();
137138

139+
// --------------------------------------------------------------------------
140+
// Subtask: deep research — fetches multiple URLs and streams progress
141+
// back to the parent chat via chat.stream using data-* chunks
142+
// --------------------------------------------------------------------------
143+
export const deepResearch = schemaTask({
144+
id: "deep-research",
145+
description:
146+
"Research a topic by fetching multiple URLs and synthesizing the results. " +
147+
"Streams progress updates to the chat as it works.",
148+
schema: z.object({
149+
query: z.string().describe("The research query or topic"),
150+
urls: z.array(z.string().url()).describe("URLs to fetch and analyze"),
151+
}),
152+
run: async ({ query, urls }) => {
153+
const partId = generateId();
154+
const results: { url: string; status: number; snippet: string }[] = [];
155+
156+
// Stream progress using data-research-progress chunks.
157+
// Using the same id means each write updates the same part in the message.
158+
function streamProgress(progress: {
159+
status: "fetching" | "done";
160+
query: string;
161+
current: number;
162+
total: number;
163+
currentUrl?: string;
164+
completedUrls: string[];
165+
}) {
166+
return chat.stream.writer({
167+
target: "root",
168+
execute: ({ write }) => {
169+
write({
170+
type: "data-research-progress" as any,
171+
id: partId,
172+
data: progress,
173+
});
174+
},
175+
});
176+
}
177+
178+
for (let i = 0; i < urls.length; i++) {
179+
const url = urls[i]!;
180+
181+
// Update progress — fetching
182+
const { waitUntilComplete } = streamProgress({
183+
status: "fetching",
184+
query,
185+
current: i + 1,
186+
total: urls.length,
187+
currentUrl: url,
188+
completedUrls: results.map((r) => r.url),
189+
});
190+
await waitUntilComplete();
191+
192+
try {
193+
const response = await fetch(url);
194+
let text = await response.text();
195+
const contentType = response.headers.get("content-type") ?? "";
196+
197+
if (contentType.includes("html")) {
198+
text = text
199+
.replace(/<script[\s\S]*?<\/script>/gi, "")
200+
.replace(/<style[\s\S]*?<\/style>/gi, "")
201+
.replace(/<[^>]+>/g, " ")
202+
.replace(/&nbsp;/g, " ")
203+
.replace(/&amp;/g, "&")
204+
.replace(/&lt;/g, "<")
205+
.replace(/&gt;/g, ">")
206+
.replace(/\s+/g, " ")
207+
.trim();
208+
}
209+
210+
results.push({
211+
url,
212+
status: response.status,
213+
snippet: text.slice(0, 500),
214+
});
215+
} catch (err) {
216+
results.push({
217+
url,
218+
status: 0,
219+
snippet: `Error: ${err instanceof Error ? err.message : String(err)}`,
220+
});
221+
}
222+
}
223+
224+
// Final progress update — done
225+
const { waitUntilComplete: waitForDone } = streamProgress({
226+
status: "done",
227+
query,
228+
current: urls.length,
229+
total: urls.length,
230+
completedUrls: results.map((r) => r.url),
231+
});
232+
await waitForDone();
233+
234+
return { query, results };
235+
},
236+
});
237+
138238
export const aiChat = chat.task({
139239
id: "ai-chat",
140240
clientDataSchema: z.object({ model: z.string().optional(), userId: z.string() }),
@@ -228,7 +328,11 @@ export const aiChat = chat.task({
228328
model: getModel(modelId),
229329
system: `You are a helpful assistant for ${userContext.name} (${userContext.plan} plan). Be concise and friendly.`,
230330
messages,
231-
tools: { inspectEnvironment, webFetch },
331+
tools: {
332+
inspectEnvironment,
333+
webFetch,
334+
deepResearch: ai.tool(deepResearch),
335+
},
232336
stopWhen: stepCountIs(10),
233337
abortSignal: stopSignal,
234338
providerOptions: {

0 commit comments

Comments
 (0)