Skip to content

Commit 08115ae

Browse files
committed
feat(chat): add preload support, dynamic tools, and preload-specific timeouts
- Add onPreload hook and preloaded field to all lifecycle events - Add transport.preload(chatId) for eagerly starting runs before first message - Add preloadWarmTimeoutInSeconds and preloadTimeout task options - Add preload:true run tag and chat.preloaded span attributes - Add UserTool model for per-user dynamic tools loaded from DB - Load dynamic tools in onPreload/onChatStart via chat.local - Build dynamicTool() instances in run and spread into streamText tools - Reference project: preload on new chat, dynamic company-info and user-preferences tools
1 parent cc21310 commit 08115ae

File tree

5 files changed

+310
-13
lines changed

5 files changed

+310
-13
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ const chatStream = streams.define<UIMessageChunk>({ id: _CHAT_STREAM_KEY });
300300
type ChatTaskWirePayload<TMessage extends UIMessage = UIMessage, TMetadata = unknown> = {
301301
messages: TMessage[];
302302
chatId: string;
303-
trigger: "submit-message" | "regenerate-message";
303+
trigger: "submit-message" | "regenerate-message" | "preload";
304304
messageId?: string;
305305
metadata?: TMetadata;
306306
/** Whether this run is continuing an existing chat whose previous run ended. */
@@ -330,8 +330,9 @@ export type ChatTaskPayload<TClientData = unknown> = {
330330
* The trigger type:
331331
* - `"submit-message"`: A new user message
332332
* - `"regenerate-message"`: Regenerate the last assistant response
333+
* - `"preload"`: Run was preloaded before the first message (only on turn 0)
333334
*/
334-
trigger: "submit-message" | "regenerate-message";
335+
trigger: "submit-message" | "regenerate-message" | "preload";
335336

336337
/** The ID of the message to regenerate (only for `"regenerate-message"`) */
337338
messageId?: string;
@@ -343,6 +344,8 @@ export type ChatTaskPayload<TClientData = unknown> = {
343344
continuation: boolean;
344345
/** The run ID of the previous run (only set when `continuation` is true). */
345346
previousRunId?: string;
347+
/** Whether this run was preloaded before the first message. */
348+
preloaded: boolean;
346349
};
347350

348351
/**
@@ -510,6 +513,20 @@ async function pipeChat(
510513
* emits a control chunk and suspends via `messagesInput.wait()`. The frontend
511514
* transport resumes the same run by sending the next message via input streams.
512515
*/
516+
/**
517+
* Event passed to the `onPreload` callback.
518+
*/
519+
export type PreloadEvent<TClientData = unknown> = {
520+
/** The unique identifier for the chat session. */
521+
chatId: string;
522+
/** The Trigger.dev run ID for this conversation. */
523+
runId: string;
524+
/** A scoped access token for this chat run. */
525+
chatAccessToken: string;
526+
/** Custom data from the frontend. */
527+
clientData?: TClientData;
528+
};
529+
513530
/**
514531
* Event passed to the `onChatStart` callback.
515532
*/
@@ -528,6 +545,8 @@ export type ChatStartEvent<TClientData = unknown> = {
528545
continuation: boolean;
529546
/** The run ID of the previous run (only set when `continuation` is true). */
530547
previousRunId?: string;
548+
/** Whether this run was preloaded before the first message. */
549+
preloaded: boolean;
531550
};
532551

533552
/**
@@ -552,6 +571,8 @@ export type TurnStartEvent<TClientData = unknown> = {
552571
continuation: boolean;
553572
/** The run ID of the previous run (only set when `continuation` is true). */
554573
previousRunId?: string;
574+
/** Whether this run was preloaded before the first message. */
575+
preloaded: boolean;
555576
};
556577

557578
/**
@@ -601,6 +622,8 @@ export type TurnCompleteEvent<TClientData = unknown> = {
601622
continuation: boolean;
602623
/** The run ID of the previous run (only set when `continuation` is true). */
603624
previousRunId?: string;
625+
/** Whether this run was preloaded before the first message. */
626+
preloaded: boolean;
604627
};
605628

606629
export type ChatTaskOptions<
@@ -638,6 +661,22 @@ export type ChatTaskOptions<
638661
*/
639662
run: (payload: ChatTaskRunPayload<inferSchemaOut<TClientDataSchema>>) => Promise<unknown>;
640663

664+
/**
665+
* Called when a preloaded run starts, before the first message arrives.
666+
*
667+
* Use this to initialize state, create DB records, and load context early —
668+
* so everything is ready when the user's first message comes through.
669+
*
670+
* @example
671+
* ```ts
672+
* onPreload: async ({ chatId, clientData }) => {
673+
* await db.chat.create({ data: { id: chatId } });
674+
* userContext.init(await loadUser(clientData.userId));
675+
* }
676+
* ```
677+
*/
678+
onPreload?: (event: PreloadEvent<inferSchemaOut<TClientDataSchema>>) => Promise<void> | void;
679+
641680
/**
642681
* Called on the first turn (turn 0) of a new run, before the `run` function executes.
643682
*
@@ -722,6 +761,26 @@ export type ChatTaskOptions<
722761
* @default "1h"
723762
*/
724763
chatAccessTokenTTL?: string;
764+
765+
/**
766+
* How long (in seconds) to keep the run warm after `onPreload` fires,
767+
* waiting for the first message before suspending.
768+
*
769+
* Only applies to preloaded runs (triggered via `transport.preload()`).
770+
*
771+
* @default Same as `warmTimeoutInSeconds`
772+
*/
773+
preloadWarmTimeoutInSeconds?: number;
774+
775+
/**
776+
* How long to wait (suspended) for the first message after a preloaded run starts.
777+
* If no message arrives within this time, the run ends.
778+
*
779+
* Only applies to preloaded runs.
780+
*
781+
* @default Same as `turnTimeout`
782+
*/
783+
preloadTimeout?: string;
725784
};
726785

727786
/**
@@ -760,13 +819,16 @@ function chatTask<
760819
const {
761820
run: userRun,
762821
clientDataSchema,
822+
onPreload,
763823
onChatStart,
764824
onTurnStart,
765825
onTurnComplete,
766826
maxTurns = 100,
767827
turnTimeout = "1h",
768828
warmTimeoutInSeconds = 30,
769829
chatAccessTokenTTL = "1h",
830+
preloadWarmTimeoutInSeconds,
831+
preloadTimeout,
770832
...restOptions
771833
} = options;
772834

@@ -786,6 +848,7 @@ function chatTask<
786848
let currentWirePayload = payload;
787849
const continuation = payload.continuation ?? false;
788850
const previousRunId = payload.previousRunId;
851+
const preloaded = payload.trigger === "preload";
789852

790853
// Accumulated model messages across turns. Turn 1 initialises from the
791854
// full history the frontend sends; subsequent turns append only the new
@@ -806,6 +869,96 @@ function chatTask<
806869
});
807870

808871
try {
872+
// Handle preloaded runs — fire onPreload, then wait for the first real message
873+
if (preloaded) {
874+
if (activeSpan) {
875+
activeSpan.setAttribute("chat.preloaded", true);
876+
}
877+
878+
const currentRunId = taskContext.ctx?.run.id ?? "";
879+
let preloadAccessToken = "";
880+
if (currentRunId) {
881+
try {
882+
preloadAccessToken = await auth.createPublicToken({
883+
scopes: {
884+
read: { runs: currentRunId },
885+
write: { inputStreams: currentRunId },
886+
},
887+
expirationTime: chatAccessTokenTTL,
888+
});
889+
} catch {
890+
// Token creation failed
891+
}
892+
}
893+
894+
// Parse client data for the preload hook
895+
const preloadClientData = (parseClientData
896+
? await parseClientData(payload.metadata)
897+
: payload.metadata) as inferSchemaOut<TClientDataSchema>;
898+
899+
// Fire onPreload hook
900+
if (onPreload) {
901+
await tracer.startActiveSpan(
902+
"onPreload()",
903+
async () => {
904+
await onPreload({
905+
chatId: payload.chatId,
906+
runId: currentRunId,
907+
chatAccessToken: preloadAccessToken,
908+
clientData: preloadClientData,
909+
});
910+
},
911+
{
912+
attributes: {
913+
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onStart",
914+
[SemanticInternalAttributes.COLLAPSED]: true,
915+
"chat.id": payload.chatId,
916+
"chat.preloaded": true,
917+
},
918+
}
919+
);
920+
}
921+
922+
// Wait for the first real message — use preload-specific timeouts if configured
923+
const effectivePreloadWarmTimeout =
924+
(metadata.get(WARM_TIMEOUT_METADATA_KEY) as number | undefined)
925+
?? preloadWarmTimeoutInSeconds
926+
?? warmTimeoutInSeconds;
927+
928+
let firstMessage: ChatTaskWirePayload | undefined;
929+
930+
if (effectivePreloadWarmTimeout > 0) {
931+
const warm = await messagesInput.once({
932+
timeoutMs: effectivePreloadWarmTimeout * 1000,
933+
spanName: "preload wait (warm)",
934+
});
935+
936+
if (warm.ok) {
937+
firstMessage = warm.output;
938+
}
939+
}
940+
941+
if (!firstMessage) {
942+
const effectivePreloadTimeout =
943+
(metadata.get(TURN_TIMEOUT_METADATA_KEY) as string | undefined)
944+
?? preloadTimeout
945+
?? turnTimeout;
946+
947+
const suspended = await messagesInput.wait({
948+
timeout: effectivePreloadTimeout,
949+
spanName: "preload wait (suspended)",
950+
});
951+
952+
if (!suspended.ok) {
953+
return; // Timed out waiting for first message — end run
954+
}
955+
956+
firstMessage = suspended.output;
957+
}
958+
959+
currentWirePayload = firstMessage;
960+
}
961+
809962
for (let turn = 0; turn < maxTurns; turn++) {
810963
// Extract turn-level context before entering the span
811964
const { metadata: wireMetadata, messages: uiMessages, ...restWire } = currentWirePayload;
@@ -947,6 +1100,7 @@ function chatTask<
9471100
chatAccessToken: turnAccessToken,
9481101
continuation,
9491102
previousRunId,
1103+
preloaded,
9501104
});
9511105
},
9521106
{
@@ -956,6 +1110,7 @@ function chatTask<
9561110
"chat.id": currentWirePayload.chatId,
9571111
"chat.messages.count": accumulatedMessages.length,
9581112
"chat.continuation": continuation,
1113+
"chat.preloaded": preloaded,
9591114
...(previousRunId ? { "chat.previous_run_id": previousRunId } : {}),
9601115
},
9611116
}
@@ -978,6 +1133,7 @@ function chatTask<
9781133
clientData,
9791134
continuation,
9801135
previousRunId,
1136+
preloaded,
9811137
});
9821138
},
9831139
{
@@ -989,6 +1145,7 @@ function chatTask<
9891145
"chat.messages.count": accumulatedMessages.length,
9901146
"chat.trigger": currentWirePayload.trigger,
9911147
"chat.continuation": continuation,
1148+
"chat.preloaded": preloaded,
9921149
...(previousRunId ? { "chat.previous_run_id": previousRunId } : {}),
9931150
},
9941151
}
@@ -1014,6 +1171,7 @@ function chatTask<
10141171
clientData,
10151172
continuation,
10161173
previousRunId,
1174+
preloaded,
10171175
signal: combinedSignal,
10181176
cancelSignal,
10191177
stopSignal,
@@ -1119,6 +1277,7 @@ function chatTask<
11191277
stopped: wasStopped,
11201278
continuation,
11211279
previousRunId,
1280+
preloaded,
11221281
});
11231282
},
11241283
{
@@ -1129,6 +1288,7 @@ function chatTask<
11291288
"chat.turn": turn + 1,
11301289
"chat.stopped": wasStopped,
11311290
"chat.continuation": continuation,
1291+
"chat.preloaded": preloaded,
11321292
...(previousRunId ? { "chat.previous_run_id": previousRunId } : {}),
11331293
"chat.messages.count": accumulatedMessages.length,
11341294
"chat.response.parts.count": capturedResponseMessage?.parts?.length ?? 0,

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ export type TriggerChatTransportOptions<TClientData = unknown> = {
183183
/** Priority (lower = higher priority). */
184184
priority?: number;
185185
};
186+
186187
};
187188

188189
/**
@@ -451,6 +452,62 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
451452
this._onSessionChange = callback;
452453
}
453454

455+
/**
456+
* Eagerly trigger a run for a chat before the first message is sent.
457+
* This allows initialization (DB setup, context loading) to happen
458+
* while the user is still typing, reducing first-response latency.
459+
*
460+
* The task's `onPreload` hook fires immediately. The run then waits
461+
* for the first message via input stream. When `sendMessages` is called
462+
* later, it detects the existing session and sends via input stream
463+
* instead of triggering a new run.
464+
*
465+
* No-op if a session already exists for this chatId.
466+
*/
467+
async preload(chatId: string): Promise<void> {
468+
// Don't preload if session already exists
469+
if (this.sessions.get(chatId)?.runId) return;
470+
471+
const payload = {
472+
messages: [] as never[],
473+
chatId,
474+
trigger: "preload" as const,
475+
metadata: this.defaultMetadata,
476+
};
477+
478+
const currentToken = await this.resolveAccessToken();
479+
const apiClient = new ApiClient(this.baseURL, currentToken);
480+
481+
const autoTags = [`chat:${chatId}`, "preload:true"];
482+
const userTags = this.triggerOptions?.tags ?? [];
483+
const tags = [...autoTags, ...userTags].slice(0, 5);
484+
485+
const triggerResponse = await apiClient.triggerTask(this.taskId, {
486+
payload,
487+
options: {
488+
payloadType: "application/json",
489+
tags,
490+
queue: this.triggerOptions?.queue ? { name: this.triggerOptions.queue } : undefined,
491+
maxAttempts: this.triggerOptions?.maxAttempts,
492+
machine: this.triggerOptions?.machine,
493+
priority: this.triggerOptions?.priority,
494+
},
495+
});
496+
497+
const runId = triggerResponse.id;
498+
const publicAccessToken =
499+
"publicAccessToken" in triggerResponse
500+
? (triggerResponse as { publicAccessToken?: string }).publicAccessToken
501+
: undefined;
502+
503+
const newSession: ChatSessionState = {
504+
runId,
505+
publicAccessToken: publicAccessToken ?? currentToken,
506+
};
507+
this.sessions.set(chatId, newSession);
508+
this.notifySessionChange(chatId, newSession);
509+
}
510+
454511
private notifySessionChange(
455512
chatId: string,
456513
session: ChatSessionState | null

0 commit comments

Comments
 (0)