Skip to content

Commit 6052e63

Browse files
committed
fix: update visibility manager release/reclaim to use dispatch indexes
The releaseMessage and releaseMessageBatch Lua scripts were still writing to the old master queue shards. Updated them to write to the new dispatch indexes instead, so released/reclaimed messages go into the new two-level index atomically. Also removed the legacyDrainComplete flag in favor of checking ZCARD on each iteration (O(1)), and removed the redundant legacyDrainComplete otel metric since master_queue.length already shows drain status.
1 parent e1604f7 commit 6052e63

File tree

3 files changed

+302
-93
lines changed

3 files changed

+302
-93
lines changed

.scratch/ai-chat-overview.md

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
# AI SDK Chat Transport & Chat Task System
2+
3+
Run AI chat completions as durable Trigger.dev tasks — with built-in realtime streaming, multi-turn conversations in a single run, typed per-run state, cancellation from the frontend, and tool support. No API routes needed.
4+
5+
## How it works
6+
7+
1. Frontend sends messages via AI SDK's `useChat` hook through `TriggerChatTransport`
8+
2. Transport triggers a Trigger.dev task with the conversation as payload
9+
3. Task streams `UIMessageChunk` events back via realtime streams
10+
4. AI SDK processes the stream natively — text, tool calls, reasoning, everything
11+
5. Frontend can cancel generation mid-stream — the transport sends a cancel signal via input streams and `chat.task` aborts `streamText` automatically
12+
13+
```
14+
useChat → TriggerChatTransport → Trigger.dev Task → streamText → realtime stream → useChat
15+
↑ cancel ↓ abort
16+
└──── input stream ("cancel") ─────────────┘
17+
```
18+
19+
## Backend: `chat.task`
20+
21+
Define a chat task in one function. Return a `streamText` result and it's automatically piped to the frontend.
22+
23+
```ts
24+
import { chat } from "@trigger.dev/sdk/ai";
25+
import { streamText } from "ai";
26+
import { openai } from "@ai-sdk/openai";
27+
28+
export const myChat = chat.task({
29+
id: "my-chat",
30+
run: async ({ modelMessages, signal }) => {
31+
return streamText({
32+
model: openai("gpt-4o"),
33+
messages: modelMessages,
34+
abortSignal: signal, // enables frontend cancellation
35+
});
36+
},
37+
});
38+
```
39+
40+
No `convertToModelMessages` call needed — `chat.task` handles the conversion and passes both `modelMessages` (for the model) and `messages` (raw `UIMessage[]`) in the payload. The `signal` is an `AbortSignal` that fires when the frontend cancels generation.
41+
42+
## Frontend: `useTriggerChatTransport`
43+
44+
A React hook that creates a type-safe transport for `useChat`. No `useMemo` needed — the hook handles memoization internally.
45+
46+
```tsx
47+
import { useChat } from "@ai-sdk/react";
48+
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";
49+
import type { myChat } from "@/trigger/chat";
50+
51+
function Chat() {
52+
const transport = useTriggerChatTransport<typeof myChat>({
53+
task: "my-chat",
54+
accessToken: getChatToken, // async function for token refresh
55+
});
56+
57+
const { messages, sendMessage, stop, status } = useChat({ transport });
58+
59+
// stop() cancels the in-flight generation — chat.task aborts streamText automatically
60+
}
61+
```
62+
63+
The `<typeof myChat>` generic gives compile-time validation of the task ID string.
64+
65+
Cancellation just works — calling `stop()` from `useChat` sends a cancel signal via an input stream to the running task. `chat.task` listens for it and aborts the `streamText` call. No extra wiring needed.
66+
67+
## Single-run mode (multi-turn conversations)
68+
69+
`chat.task` keeps the entire conversation inside a single run using waitpoint tokens. After each AI response, the run pauses until the next message arrives — then resumes in the same process.
70+
71+
- All turns share the same run ID, logs, and metadata
72+
- In-memory state persists across turns without external storage
73+
- The full conversation is observable as one run in the dashboard
74+
75+
```ts
76+
export const myChat = chat.task({
77+
id: "my-chat",
78+
maxTurns: 50, // default: 100
79+
turnTimeout: "30m", // default: "1h"
80+
run: async ({ modelMessages, signal }) => { ... },
81+
});
82+
```
83+
84+
## Per-run state with `chat.state`
85+
86+
Define typed, per-run state that's accessible from anywhere during task execution — tools, the run function, nested helpers. Each conversation gets its own isolated copy.
87+
88+
```ts
89+
import { chat } from "@trigger.dev/sdk/ai";
90+
import { streamText, tool } from "ai";
91+
import { openai } from "@ai-sdk/openai";
92+
import { z } from "zod";
93+
94+
const state = chat.state({
95+
init: () => ({ score: 0, questionsAsked: 0, streak: 0 }),
96+
});
97+
98+
// Tools at module level — access state directly
99+
const checkAnswer = tool({
100+
description: "Check the user's answer",
101+
inputSchema: z.object({ correct: z.boolean() }),
102+
execute: async ({ correct }) => {
103+
state.questionsAsked++;
104+
if (correct) { state.score++; state.streak++; }
105+
else { state.streak = 0; }
106+
return { score: state.score, total: state.questionsAsked };
107+
},
108+
});
109+
110+
export const quiz = chat.task({
111+
id: "quiz-bot",
112+
state,
113+
run: async ({ modelMessages, signal }) => {
114+
return streamText({
115+
model: openai("gpt-4o-mini"),
116+
system: `Score: ${state.score}/${state.questionsAsked}`,
117+
messages: modelMessages,
118+
tools: { checkAnswer },
119+
maxSteps: 5,
120+
abortSignal: signal,
121+
});
122+
},
123+
});
124+
```
125+
126+
State is backed by a Proxy over locals — no globals, fully isolated per run. Supports optional `persist` callback for external storage:
127+
128+
```ts
129+
const state = chat.state({
130+
init: () => ({ preferences: [] }),
131+
persist: async ({ state, chatId }) => {
132+
await db.sessions.upsert({ where: { chatId }, data: state });
133+
},
134+
persistDebounceMs: 1000, // debounce rapid mutations
135+
});
136+
```
137+
138+
## AI SDK tools
139+
140+
Tools work through the full pipeline. Tool calls and results stream to the frontend and appear in `message.parts`.
141+
142+
```ts
143+
export const myChat = chat.task({
144+
id: "my-chat",
145+
run: async ({ modelMessages, signal }) => {
146+
return streamText({
147+
model: openai("gpt-4o"),
148+
messages: modelMessages,
149+
tools: { weather: weatherTool },
150+
maxSteps: 5,
151+
abortSignal: signal,
152+
});
153+
},
154+
});
155+
```
156+
157+
## `chat.pipe` for complex flows
158+
159+
For agent loops where `streamText` is called deep in your code, use `chat.pipe` instead of returning the result:
160+
161+
```ts
162+
import { chat } from "@trigger.dev/sdk/ai";
163+
164+
export const agent = chat.task({
165+
id: "agent-chat",
166+
run: async ({ modelMessages, signal }) => {
167+
await runAgentLoop(modelMessages, signal);
168+
// Don't return — chat.pipe handles streaming
169+
},
170+
});
171+
172+
async function runAgentLoop(messages: ModelMessage[], signal: AbortSignal) {
173+
const result = streamText({ model, messages, abortSignal: signal });
174+
await chat.pipe(result); // works from anywhere inside the task
175+
}
176+
```
177+
178+
## Cancellation
179+
180+
Frontend cancellation flows through input streams to an `AbortSignal` provided in the run payload:
181+
182+
1. User clicks stop (or calls `stop()` from `useChat`)
183+
2. `TriggerChatTransport` sends a cancel signal via an input stream to the running task
184+
3. `chat.task` receives the signal and aborts the `signal` passed to your `run` function
185+
4. `streamText` stops generating — `useChat` shows the partial response
186+
187+
Just pass `signal` to `abortSignal` on `streamText` and cancellation works end-to-end. No manual abort controller wiring.
188+
189+
## Type-safe access tokens
190+
191+
```ts
192+
// Server action
193+
import { chat } from "@trigger.dev/sdk/ai";
194+
import type { myChat } from "@/trigger/chat";
195+
196+
export const getChatToken = () => chat.createAccessToken<typeof myChat>("my-chat");
197+
```
198+
199+
## Package imports
200+
201+
| Import | Package |
202+
|--------|---------|
203+
| `chat.task`, `chat.state`, `chat.pipe`, `chat.createAccessToken` | `@trigger.dev/sdk/ai` |
204+
| `TriggerChatTransport` | `@trigger.dev/sdk/chat` |
205+
| `useTriggerChatTransport` | `@trigger.dev/sdk/chat/react` |
206+
207+
Requires `ai` package v6+.

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 29 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,7 +1094,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
10941094
): Promise<number> {
10951095
const queueKey = this.keys.queueKey(queueId);
10961096
const queueItemsKey = this.keys.queueItemsKey(queueId);
1097-
const masterQueueKey = this.keys.masterQueueKey(shardId);
10981097
const descriptor = this.queueDescriptorCache.get(queueId) ?? {
10991098
id: queueId,
11001099
tenantId,
@@ -1153,12 +1152,16 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11531152
if (!reserved) {
11541153
// Release ALL remaining messages (from index i onward) back to queue
11551154
// This prevents messages from being stranded in the in-flight set
1155+
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId);
1156+
const dispatchKey = this.keys.dispatchKey(shardId);
11561157
await this.visibilityManager.releaseBatch(
11571158
claimedMessages.slice(i),
11581159
queueId,
11591160
queueKey,
11601161
queueItemsKey,
1161-
masterQueueKey
1162+
tenantQueueIndexKey,
1163+
dispatchKey,
1164+
tenantId
11621165
);
11631166
// Stop processing more messages from this queue since we're at capacity
11641167
break;
@@ -1293,7 +1296,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12931296
const shardId = this.masterQueue.getShardForQueue(queueId);
12941297
const queueKey = this.keys.queueKey(queueId);
12951298
const queueItemsKey = this.keys.queueItemsKey(queueId);
1296-
const masterQueueKey = this.keys.masterQueueKey(shardId);
12971299
const inflightDataKey = this.keys.inflightDataKey(shardId);
12981300

12991301
// Get stored message for concurrency release
@@ -1315,13 +1317,17 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13151317
}
13161318
: { id: queueId, tenantId: "", metadata: {} };
13171319

1318-
// Release back to queue (visibility manager updates old master queue internally)
1320+
// Release back to queue (visibility manager updates dispatch indexes atomically)
1321+
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1322+
const dispatchKey = this.keys.dispatchKey(shardId);
13191323
await this.visibilityManager.release(
13201324
messageId,
13211325
queueId,
13221326
queueKey,
13231327
queueItemsKey,
1324-
masterQueueKey,
1328+
tenantQueueIndexKey,
1329+
dispatchKey,
1330+
descriptor.tenantId,
13251331
Date.now() // Put at back of queue
13261332
);
13271333

@@ -1330,17 +1336,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13301336
await this.concurrencyManager.release(descriptor, messageId);
13311337
}
13321338

1333-
// Update new dispatch indexes (message is back in queue, update scores)
1334-
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1335-
const dispatchKey = this.keys.dispatchKey(shardId);
1336-
await this.redis.updateDispatchIndexes(
1337-
queueKey,
1338-
tenantQueueIndexKey,
1339-
dispatchKey,
1340-
queueId,
1341-
descriptor.tenantId
1342-
);
1343-
13441339
this.logger.debug("Message released", {
13451340
messageId,
13461341
queueId,
@@ -1359,7 +1354,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13591354
const shardId = this.masterQueue.getShardForQueue(queueId);
13601355
const queueKey = this.keys.queueKey(queueId);
13611356
const queueItemsKey = this.keys.queueItemsKey(queueId);
1362-
const masterQueueKey = this.keys.masterQueueKey(shardId);
13631357
const inflightDataKey = this.keys.inflightDataKey(shardId);
13641358

13651359
// Get stored message
@@ -1391,7 +1385,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13911385
queueId,
13921386
queueKey,
13931387
queueItemsKey,
1394-
masterQueueKey,
13951388
shardId,
13961389
descriptor,
13971390
error
@@ -1407,7 +1400,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14071400
queueId: string,
14081401
queueKey: string,
14091402
queueItemsKey: string,
1410-
masterQueueKey: string,
14111403
shardId: number,
14121404
descriptor: QueueDescriptor,
14131405
error?: Error
@@ -1427,12 +1419,16 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14271419

14281420
// Release with delay, passing the updated message data so the Lua script
14291421
// atomically writes the incremented attempt count when re-queuing.
1422+
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1423+
const dispatchKey = this.keys.dispatchKey(shardId);
14301424
await this.visibilityManager.release(
14311425
storedMessage.id,
14321426
queueId,
14331427
queueKey,
14341428
queueItemsKey,
1435-
masterQueueKey,
1429+
tenantQueueIndexKey,
1430+
dispatchKey,
1431+
descriptor.tenantId,
14361432
Date.now() + nextDelay,
14371433
JSON.stringify(updatedMessage)
14381434
);
@@ -1442,17 +1438,6 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14421438
await this.concurrencyManager.release(descriptor, storedMessage.id);
14431439
}
14441440

1445-
// Update dispatch indexes (message is back in queue with delay)
1446-
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId);
1447-
const dispatchKey = this.keys.dispatchKey(shardId);
1448-
await this.redis.updateDispatchIndexes(
1449-
queueKey,
1450-
tenantQueueIndexKey,
1451-
dispatchKey,
1452-
queueId,
1453-
descriptor.tenantId
1454-
);
1455-
14561441
this.telemetry.recordRetry();
14571442

14581443
this.logger.debug("Message scheduled for retry", {
@@ -1550,11 +1535,17 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
15501535
let totalReclaimed = 0;
15511536

15521537
for (let shardId = 0; shardId < this.shardCount; shardId++) {
1553-
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
1554-
queueKey: this.keys.queueKey(queueId),
1555-
queueItemsKey: this.keys.queueItemsKey(queueId),
1556-
masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)),
1557-
}));
1538+
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => {
1539+
const tenantId = this.keys.extractTenantId(queueId);
1540+
const queueShardId = this.masterQueue.getShardForQueue(queueId);
1541+
return {
1542+
queueKey: this.keys.queueKey(queueId),
1543+
queueItemsKey: this.keys.queueItemsKey(queueId),
1544+
tenantQueueIndexKey: this.keys.tenantQueueIndexKey(tenantId),
1545+
dispatchKey: this.keys.dispatchKey(queueShardId),
1546+
tenantId,
1547+
};
1548+
});
15581549

15591550
if (reclaimedMessages.length > 0) {
15601551
// Release concurrency for all reclaimed messages in a single batch
@@ -1580,32 +1571,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
15801571
}
15811572
}
15821573

1583-
// Update dispatch indexes for reclaimed queues (messages are back in queue)
1584-
const updatedQueues = new Set<string>();
1585-
for (const msg of reclaimedMessages) {
1586-
const key = `${msg.tenantId}:${msg.queueId}`;
1587-
if (updatedQueues.has(key)) continue;
1588-
updatedQueues.add(key);
1589-
1590-
try {
1591-
const queueKey = this.keys.queueKey(msg.queueId);
1592-
const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(msg.tenantId);
1593-
const dispatchKey = this.keys.dispatchKey(shardId);
1594-
await this.redis.updateDispatchIndexes(
1595-
queueKey,
1596-
tenantQueueIndexKey,
1597-
dispatchKey,
1598-
msg.queueId,
1599-
msg.tenantId
1600-
);
1601-
} catch (error) {
1602-
this.logger.error("Failed to update dispatch indexes for reclaimed message", {
1603-
queueId: msg.queueId,
1604-
tenantId: msg.tenantId,
1605-
error: error instanceof Error ? error.message : String(error),
1606-
});
1607-
}
1608-
}
1574+
// Dispatch indexes are updated atomically by the releaseMessage Lua script
1575+
// inside reclaimTimedOut, so no separate index update needed here.
16091576
}
16101577

16111578
totalReclaimed += reclaimedMessages.length;

0 commit comments

Comments
 (0)