Skip to content

Commit dbc0034

Browse files
committed
feat(sdk): chat.agent — durable conversational task runtime
Adds chat.agent({...}), a session-aware task definition for AI chat agents. The runtime sits on top of the Sessions primitive and handles: - Delta-only wire: each /in/append carries at most one message; agent rebuilds prior history at run boot from an S3 snapshot + session.out replay tail. Awaited snapshot writes after every onTurnComplete keep the chain durable across idle suspends. - Lifecycle hooks: onChatStart, onTurnStart, onTurnComplete, onAction, onValidateMessages, hydrateMessages (short-circuits snapshot+replay if the customer owns history). - chat.history read primitives for HITL flows (getPendingToolCalls, getResolvedToolCalls, extractNewToolResults, findMessage, all/getChain). - chat.local — per-run typed data with Proxy access + dirty tracking. - chat.headStart — first-turn TTFC bridge via a customer HTTP handler. - oomMachine — one-shot OOM-retry on a larger machine; cutoff derived from latest trigger:turn-complete chunk on session.out. - Actions are no longer turns — onAction may mutate via chat.history.* and may return a StreamTextResult without bumping the turn counter. - gen_ai.conversation.id stamped on chat spans + metrics for telemetry. - Skills runtime (loadable per-agent and per-task) + agent skills bundling. Snapshot + replay integration tests in apps/webapp/test/. mockChatAgent test harness updated for the new wire. Includes the chat-agent, chat-agent-delta-wire-snapshots, chat-history-read-primitives, chat-head-start, chat-actions-no-turn, chat-session-attributes, agent-skills, and mock-chat-agent-test-harness changesets.
1 parent b84d537 commit dbc0034

56 files changed

Lines changed: 17924 additions & 151 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.changeset/agent-skills.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
"@trigger.dev/build": patch
5+
"trigger.dev": patch
6+
---
7+
8+
Add Agent Skills for `chat.agent`. Drop a folder with a `SKILL.md` and any helper scripts/references next to your task code, register it with `skills.define({ id, path })`, and the CLI bundles it into the deploy image automatically — no `trigger.config.ts` changes. The agent gets a one-line summary in its system prompt and discovers full instructions on demand via `loadSkill`, with `bash` and `readFile` tools scoped per-skill (path-traversal guards, output caps, abort-signal propagation).
9+
10+
```ts
11+
const pdfSkill = skills.define({ id: "pdf-extract", path: "./skills/pdf-extract" });
12+
13+
chat.skills.set([await pdfSkill.local()]);
14+
```
15+
16+
Built on the [AI SDK cookbook pattern](https://ai-sdk.dev/cookbook/guides/agent-skills) — portable across providers. SDK + CLI only for now; dashboard-editable `SKILL.md` text is on the roadmap.

.changeset/chat-actions-no-turn.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
---
2+
"@trigger.dev/sdk": minor
3+
---
4+
5+
`chat.agent` actions are no longer treated as turns. They fire `hydrateMessages` and `onAction` only — no `onTurnStart` / `prepareMessages` / `onBeforeTurnComplete` / `onTurnComplete`, no `run()`, no turn-counter increment. The trace span is named `chat action` instead of `chat turn N`.
6+
7+
`onAction` can now return a `StreamTextResult`, `string`, or `UIMessage` to produce a model response from the action; returning `void` (the previous and now default) is side-effect-only.
8+
9+
**Migration**: if you previously had `run()` branching on `payload.trigger === "action"`, return your `streamText(...)` from `onAction` instead. If you persisted in `onTurnComplete`, do that work inside `onAction`. For any other state-only action, just remove your skip-the-model workaround — the default is now correct.
10+
11+
```ts
12+
// before
13+
onAction: async ({ action }) => {
14+
if (action.type === "regenerate") {
15+
chat.store.set({ skipModelCall: false });
16+
chat.history.slice(0, -1);
17+
}
18+
},
19+
run: async ({ messages, signal }) => {
20+
if (chat.store.get()?.skipModelCall) return;
21+
return streamText({ model, messages, abortSignal: signal });
22+
},
23+
24+
// after
25+
onAction: async ({ action, messages, signal }) => {
26+
if (action.type === "regenerate") {
27+
chat.history.slice(0, -1);
28+
return streamText({ model, messages, abortSignal: signal });
29+
}
30+
},
31+
run: async ({ messages, signal }) =>
32+
streamText({ model, messages, abortSignal: signal }),
33+
```
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
`chat.agent` wire is now delta-only — clients ship at most one new message per `.in/append` instead of the full `UIMessage[]` history. The agent rebuilds prior history at run boot from a JSON snapshot in object storage plus a `wait=0` replay of the `session.out` tail. Long chats stop hitting the 512 KiB body cap on `/realtime/v1/sessions/{id}/in/append`. Snapshot writes happen after every `onTurnComplete`, awaited so they survive idle suspend; reads happen only at run boot. Registering a `hydrateMessages` hook short-circuits both the snapshot read/write and the replay — the customer is the source of truth for history.
7+
8+
Custom transports that constructed `ChatTaskWirePayload` directly need to drop the `messages: UIMessage[]` field and use `message?: UIMessage` (singular). Built-in transports (`TriggerChatTransport`, `AgentChat`) handle the change below the customer-facing surface — most apps need no changes. Configure object-store env vars (`OBJECT_STORE_*`) on your webapp deployment if you haven't already; without an object store and without `hydrateMessages`, conversations don't survive run boundaries.

.changeset/chat-agent.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
---
2+
"@trigger.dev/sdk": minor
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Run AI chats as durable Trigger.dev tasks. Define the agent in one function, wire `useChat` to it from React, and the conversation survives page refreshes, network blips, and process restarts — with built-in support for tools, HITL approvals, multi-turn state, and stop-mid-stream cancellation.
7+
8+
```ts
9+
import { chat } from "@trigger.dev/sdk/ai";
10+
import { streamText } from "ai";
11+
import { openai } from "@ai-sdk/openai";
12+
13+
export const myChat = chat.agent({
14+
id: "my-chat",
15+
run: async ({ messages, signal }) =>
16+
streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }),
17+
});
18+
```
19+
20+
```tsx
21+
import { useChat } from "@ai-sdk/react";
22+
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";
23+
24+
const transport = useTriggerChatTransport({ task: "my-chat", accessToken });
25+
const { messages, sendMessage } = useChat({ transport });
26+
```
27+
28+
Lifecycle hooks (`onPreload`, `onTurnStart`, `onTurnComplete`, etc.) cover the common needs around persistence, validation, and post-turn work. `chat.store` gives you a typed shared-data slot the agent and client both read and write. `chat.endRun()` exits cleanly when the agent decides it's done. The transport's `watch` mode lets a dashboard tab observe a run without driving it.
29+
30+
Drops the pre-Sessions chat stream constants (`CHAT_STREAM_KEY`, `CHAT_MESSAGES_STREAM_ID`, `CHAT_STOP_STREAM_ID`) — migrate to `sessions.open(id).out` / `.in`.

.changeset/chat-head-start.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
---
2+
"@trigger.dev/sdk": minor
3+
---
4+
5+
Add `chat.headStart` — an opt-in fast-path that runs the first turn's `streamText` step in your warm Next.js / Hono / Workers / Express handler while the trigger agent run boots in parallel. Cold-start TTFC drops by ~50% on the first message; the agent owns step 2+ (tool execution, persistence, hooks) so heavy deps stay where they belong.
6+
7+
```ts
8+
// app/api/chat/route.ts (Next.js / any Web Fetch framework)
9+
import { chat } from "@trigger.dev/sdk/chat-server";
10+
import { streamText } from "ai";
11+
import { openai } from "@ai-sdk/openai";
12+
import { headStartTools } from "@/lib/chat-tools-schemas"; // schema-only
13+
14+
export const POST = chat.headStart({
15+
agentId: "ai-chat",
16+
run: async ({ chat: chatHelper }) =>
17+
streamText({
18+
...chatHelper.toStreamTextOptions({ tools: headStartTools }),
19+
model: openai("gpt-4o-mini"),
20+
system: "You are a helpful AI assistant.",
21+
}),
22+
});
23+
```
24+
25+
```tsx
26+
// browser — opt in by pointing the transport at your handler
27+
const transport = useTriggerChatTransport({
28+
task: "ai-chat",
29+
accessToken,
30+
headStart: "/api/chat", // first-turn-only; turn 2+ bypasses the endpoint
31+
});
32+
```
33+
34+
For Node-only frameworks (Express, Fastify, Koa, raw `node:http`) use `chat.toNodeListener(handler)` to bridge the Web Fetch handler to `(req, res)`. Adds a new `@trigger.dev/sdk/chat-server` subpath; bundle stays Web Fetch–only with no `node:*` imports.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
"@trigger.dev/sdk": minor
3+
---
4+
5+
Add read primitives to `chat.history` for HITL flows: `getPendingToolCalls()`, `getResolvedToolCalls()`, `extractNewToolResults(message)`, `getChain()`, and `findMessage(messageId)`. These lift the accumulator-walking logic that customers building human-in-the-loop tools were re-implementing into the SDK.
6+
7+
Use `getPendingToolCalls()` to gate fresh user turns while a tool call is awaiting an answer. Use `extractNewToolResults(message)` to dedup tool results when persisting to your own store — the helper returns only the parts whose `toolCallId` is not already resolved on the chain.
8+
9+
```ts
10+
const pending = chat.history.getPendingToolCalls();
11+
if (pending.length > 0) {
12+
// an addToolOutput is expected before a new user message
13+
}
14+
15+
onTurnComplete: async ({ responseMessage }) => {
16+
const newResults = chat.history.extractNewToolResults(responseMessage);
17+
for (const r of newResults) {
18+
await db.toolResults.upsert({ id: r.toolCallId, output: r.output, errorText: r.errorText });
19+
}
20+
};
21+
```
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Stamp `gen_ai.conversation.id` (the chat id) on every span and metric emitted from inside a `chat.task` or `chat.agent` run. Lets you filter dashboard spans, runs, and metrics by the chat conversation that produced them — independent of the run boundary, so multi-run chats correlate cleanly. No code changes required on the user side.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Unit-test `chat.agent` definitions offline with `mockChatAgent` from `@trigger.dev/sdk/ai/test`. Drives a real agent's turn loop in-process — no network, no task runtime — so you can send messages, actions, and stop signals via driver methods, inspect captured output chunks, and verify hooks fire. Pairs with `MockLanguageModelV3` from `ai/test` for model mocking. `setupLocals` lets you pre-seed `locals` (DB clients, service stubs) before `run()` starts.
7+
8+
The broader `runInMockTaskContext` harness it's built on lives at `@trigger.dev/core/v3/test` — useful for unit-testing any task code, not just chat.
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// Plan F.3: integration test that round-trips a `ChatSnapshotV1` blob
2+
// through the SDK's snapshot helpers + a real MinIO backing store. Mirrors
3+
// the testcontainer pattern from `objectStore.test.ts`.
4+
//
5+
// What this verifies end-to-end:
6+
// - SDK's `writeChatSnapshot` calls `apiClient.createUploadPayloadUrl`
7+
// to mint a presigned PUT, then PUTs JSON to it.
8+
// - SDK's `readChatSnapshot` calls `apiClient.getPayloadUrl` to mint a
9+
// presigned GET, then fetches and parses.
10+
// - The webapp's `generatePresignedUrl` produces URLs MinIO accepts.
11+
// - The blob round-trips with `version: 1` shape preserved.
12+
// - 404 (no snapshot for a fresh session) returns `undefined`, not an
13+
// error.
14+
//
15+
// This is the integration safety net behind the unit tests in
16+
// `packages/trigger-sdk/test/chat-snapshot.test.ts` — those tests mock
17+
// `fetch`; this one drives a real S3-compatible backend.
18+
19+
import { postgresAndMinioTest } from "@internal/testcontainers";
20+
import { apiClientManager } from "@trigger.dev/core/v3";
21+
import {
22+
__readChatSnapshotProductionPathForTests as readChatSnapshot,
23+
__writeChatSnapshotProductionPathForTests as writeChatSnapshot,
24+
type ChatSnapshotV1,
25+
} from "@trigger.dev/sdk/ai";
26+
import type { UIMessage } from "ai";
27+
import { afterEach, describe, expect, vi } from "vitest";
28+
import { env } from "~/env.server";
29+
import { generatePresignedUrl } from "~/v3/objectStore.server";
30+
31+
vi.setConfig({ testTimeout: 60_000 });
32+
33+
// ── Helpers ────────────────────────────────────────────────────────────
34+
35+
function makeSnapshot(opts: { messages?: UIMessage[]; lastOutEventId?: string } = {}): ChatSnapshotV1 {
36+
return {
37+
version: 1,
38+
savedAt: 1_700_000_000_000,
39+
messages: opts.messages ?? [
40+
{
41+
id: "u-1",
42+
role: "user",
43+
parts: [{ type: "text", text: "hello" }],
44+
},
45+
{
46+
id: "a-1",
47+
role: "assistant",
48+
parts: [{ type: "text", text: "world" }],
49+
},
50+
],
51+
lastOutEventId: opts.lastOutEventId ?? "evt-42",
52+
lastOutTimestamp: 1_700_000_000_500,
53+
};
54+
}
55+
56+
/**
57+
* Stub `apiClientManager.clientOrThrow()` so the SDK helpers see a fake
58+
* api client whose `getPayloadUrl` / `createUploadPayloadUrl` return
59+
* presigned URLs minted by the webapp's real `generatePresignedUrl`
60+
* (which signs against MinIO).
61+
*
62+
* The SDK helpers internally do `fetch(presignedUrl, ...)` to read/write
63+
* the blob, so MinIO ends up holding the actual bytes.
64+
*/
65+
function stubApiClient(opts: { projectRef: string; envSlug: string }) {
66+
vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue({
67+
async getPayloadUrl(filename: string) {
68+
const result = await generatePresignedUrl(opts.projectRef, opts.envSlug, filename, "GET");
69+
if (!result.success) throw new Error(result.error);
70+
return { presignedUrl: result.url };
71+
},
72+
async createUploadPayloadUrl(filename: string) {
73+
const result = await generatePresignedUrl(opts.projectRef, opts.envSlug, filename, "PUT");
74+
if (!result.success) throw new Error(result.error);
75+
return { presignedUrl: result.url };
76+
},
77+
} as never);
78+
}
79+
80+
// Suppress noisy warnings from logger.warn during error-path tests.
81+
let warnSpy: ReturnType<typeof vi.spyOn>;
82+
83+
afterEach(() => {
84+
vi.restoreAllMocks();
85+
warnSpy?.mockRestore();
86+
});
87+
88+
// ── Tests ──────────────────────────────────────────────────────────────
89+
90+
describe("chat snapshot integration (MinIO + SDK helpers)", () => {
91+
postgresAndMinioTest("round-trips a snapshot through real MinIO", async ({ minioConfig }) => {
92+
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
93+
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
94+
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
95+
env.OBJECT_STORE_REGION = minioConfig.region;
96+
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
97+
98+
stubApiClient({ projectRef: "proj_snap_rt", envSlug: "dev" });
99+
100+
const sessionId = "sess_round_trip_1";
101+
const snapshot = makeSnapshot();
102+
103+
// Write through the SDK helper — should land in MinIO at
104+
// `packets/proj_snap_rt/dev/sessions/sess_round_trip_1/snapshot.json`.
105+
await writeChatSnapshot(sessionId, snapshot);
106+
107+
// Read back through the SDK helper — should reconstruct the original.
108+
const result = await readChatSnapshot(sessionId);
109+
110+
expect(result).toEqual(snapshot);
111+
});
112+
113+
postgresAndMinioTest("returns undefined for a fresh session with no snapshot", async ({ minioConfig }) => {
114+
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
115+
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
116+
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
117+
env.OBJECT_STORE_REGION = minioConfig.region;
118+
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
119+
120+
stubApiClient({ projectRef: "proj_snap_404", envSlug: "dev" });
121+
122+
warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
123+
124+
// Session never had a snapshot written — read returns undefined.
125+
const result = await readChatSnapshot("sess_never_existed");
126+
expect(result).toBeUndefined();
127+
});
128+
129+
postgresAndMinioTest("overwrites a prior snapshot in place (single-writer)", async ({ minioConfig }) => {
130+
// The runtime guarantees one attempt alive at a time, and
131+
// `writeChatSnapshot` runs awaited after `onTurnComplete`. Verify
132+
// that a second write to the same key replaces the first cleanly —
133+
// the read-after-write reflects the latest blob.
134+
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
135+
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
136+
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
137+
env.OBJECT_STORE_REGION = minioConfig.region;
138+
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
139+
140+
stubApiClient({ projectRef: "proj_snap_overwrite", envSlug: "dev" });
141+
142+
const sessionId = "sess_overwrite";
143+
144+
const turn1 = makeSnapshot({
145+
messages: [
146+
{ id: "u-1", role: "user", parts: [{ type: "text", text: "first" }] },
147+
],
148+
lastOutEventId: "evt-turn1",
149+
});
150+
const turn2 = makeSnapshot({
151+
messages: [
152+
{ id: "u-1", role: "user", parts: [{ type: "text", text: "first" }] },
153+
{ id: "a-1", role: "assistant", parts: [{ type: "text", text: "reply-1" }] },
154+
{ id: "u-2", role: "user", parts: [{ type: "text", text: "second" }] },
155+
{ id: "a-2", role: "assistant", parts: [{ type: "text", text: "reply-2" }] },
156+
],
157+
lastOutEventId: "evt-turn2",
158+
});
159+
160+
await writeChatSnapshot(sessionId, turn1);
161+
await writeChatSnapshot(sessionId, turn2);
162+
163+
const result = await readChatSnapshot(sessionId);
164+
expect(result).toEqual(turn2);
165+
expect(result?.messages).toHaveLength(4);
166+
expect(result?.lastOutEventId).toBe("evt-turn2");
167+
});
168+
169+
postgresAndMinioTest("isolates snapshots by sessionId (no cross-talk)", async ({ minioConfig }) => {
170+
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
171+
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
172+
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
173+
env.OBJECT_STORE_REGION = minioConfig.region;
174+
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
175+
176+
stubApiClient({ projectRef: "proj_snap_iso", envSlug: "dev" });
177+
178+
const sessA = "sess_iso_A";
179+
const sessB = "sess_iso_B";
180+
const snapA = makeSnapshot({ lastOutEventId: "evt-A" });
181+
const snapB = makeSnapshot({ lastOutEventId: "evt-B" });
182+
183+
await writeChatSnapshot(sessA, snapA);
184+
await writeChatSnapshot(sessB, snapB);
185+
186+
const readA = await readChatSnapshot(sessA);
187+
const readB = await readChatSnapshot(sessB);
188+
189+
expect(readA?.lastOutEventId).toBe("evt-A");
190+
expect(readB?.lastOutEventId).toBe("evt-B");
191+
// Distinct objects — modifying one shouldn't affect the other.
192+
expect(readA?.lastOutEventId).not.toBe(readB?.lastOutEventId);
193+
});
194+
195+
postgresAndMinioTest("handles snapshots with large message lists (~50 messages)", async ({ minioConfig }) => {
196+
// Stress test: a 50-turn chat snapshot. Plan F.4 mentions the
197+
// pre-change baseline grew past 512 KiB around turn 10-30 with tool
198+
// use; the post-slim wire keeps wire payloads small but the snapshot
199+
// itself can still get large. Verify the helpers handle a realistic
200+
// payload size.
201+
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
202+
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
203+
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
204+
env.OBJECT_STORE_REGION = minioConfig.region;
205+
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
206+
207+
stubApiClient({ projectRef: "proj_snap_big", envSlug: "dev" });
208+
209+
const messages: UIMessage[] = [];
210+
for (let i = 0; i < 50; i++) {
211+
messages.push({
212+
id: `u-${i}`,
213+
role: "user",
214+
parts: [{ type: "text", text: `user message ${i}: ${"x".repeat(200)}` }],
215+
});
216+
messages.push({
217+
id: `a-${i}`,
218+
role: "assistant",
219+
parts: [{ type: "text", text: `assistant reply ${i}: ${"y".repeat(500)}` }],
220+
});
221+
}
222+
const snapshot = makeSnapshot({ messages, lastOutEventId: "evt-50" });
223+
224+
await writeChatSnapshot("sess_big_chat", snapshot);
225+
const result = await readChatSnapshot("sess_big_chat");
226+
227+
expect(result).toBeDefined();
228+
expect(result!.messages).toHaveLength(100);
229+
expect(result!.lastOutEventId).toBe("evt-50");
230+
// Spot-check ordering integrity — the messages array round-tripped
231+
// in the same order.
232+
expect(result!.messages[0]!.id).toBe("u-0");
233+
expect(result!.messages[99]!.id).toBe("a-49");
234+
});
235+
});

0 commit comments

Comments
 (0)