Skip to content

Commit c804a98

Browse files
committed
Added better telemetry support to view turns
1 parent 03ba2ba commit c804a98

File tree

3 files changed

+184
-80
lines changed

3 files changed

+184
-80
lines changed

packages/core/src/v3/realtimeStreams/types.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ export type PipeStreamOptions = {
7171
* Additional request options for the API call.
7272
*/
7373
requestOptions?: ApiRequestOptions;
74+
/** Override the default span name for this operation. */
75+
spanName?: string;
76+
/** When true, the span will be collapsed in the dashboard. */
77+
collapsed?: boolean;
7478
};
7579

7680
/**
@@ -199,6 +203,8 @@ export type InputStreamSubscription = {
199203
export type InputStreamOnceOptions = {
200204
signal?: AbortSignal;
201205
timeoutMs?: number;
206+
/** Override the default span name for this operation. */
207+
spanName?: string;
202208
};
203209

204210
export type SendInputStreamOptions = {
@@ -234,6 +240,9 @@ export type InputStreamWaitOptions = {
234240
* and filtering waitpoints via `wait.listTokens()`.
235241
*/
236242
tags?: string[];
243+
244+
/** Override the default span name for this operation. */
245+
spanName?: string;
237246
};
238247

239248
export type InferInputStreamType<T> = T extends RealtimeDefinedInputStream<infer TData>

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 170 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import {
2+
accessoryAttributes,
23
AnyTask,
34
isSchemaZodEsque,
5+
SemanticInternalAttributes,
46
Task,
57
type inferSchemaIn,
68
type PipeStreamOptions,
@@ -11,10 +13,12 @@ import {
1113
} from "@trigger.dev/core/v3";
1214
import type { ModelMessage, UIMessage } from "ai";
1315
import { convertToModelMessages, dynamicTool, jsonSchema, JSONSchema7, Schema, Tool, ToolCallOptions, zodSchema } from "ai";
16+
import { type Attributes, trace } from "@opentelemetry/api";
1417
import { auth } from "./auth.js";
1518
import { metadata } from "./metadata.js";
1619
import { streams } from "./streams.js";
1720
import { createTask } from "./shared.js";
21+
import { tracer } from "./tracer.js";
1822
import {
1923
CHAT_STREAM_KEY as _CHAT_STREAM_KEY,
2024
CHAT_MESSAGES_STREAM_ID,
@@ -281,6 +285,9 @@ export type PipeChatOptions = {
281285
* @default "self" (current run)
282286
*/
283287
target?: string;
288+
289+
/** Override the default span name for this operation. */
290+
spanName?: string;
284291
};
285292

286293
/**
@@ -373,6 +380,9 @@ async function pipeChat(
373380
if (options?.target) {
374381
pipeOptions.target = options.target;
375382
}
383+
if (options?.spanName) {
384+
pipeOptions.spanName = options.spanName;
385+
}
376386

377387
const { waitUntilComplete } = streams.pipe(streamKey, stream, pipeOptions);
378388
await waitUntilComplete();
@@ -478,6 +488,12 @@ function chatTask<TIdentifier extends string>(
478488
return createTask<TIdentifier, ChatTaskWirePayload, unknown>({
479489
...restOptions,
480490
run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => {
491+
// Set gen_ai.conversation.id on the run-level span for dashboard context
492+
const activeSpan = trace.getActiveSpan();
493+
if (activeSpan) {
494+
activeSpan.setAttribute("gen_ai.conversation.id", payload.chatId);
495+
}
496+
481497
let currentWirePayload = payload;
482498

483499
// Mutable reference to the current turn's stop controller so the
@@ -491,92 +507,142 @@ function chatTask<TIdentifier extends string>(
491507

492508
try {
493509
for (let turn = 0; turn < maxTurns; turn++) {
494-
_chatPipeCount = 0;
510+
// Extract turn-level context before entering the span
511+
const { metadata: wireMetadata, messages: uiMessages, ...restWire } = currentWirePayload;
512+
const lastUserMessage = extractLastUserMessageText(uiMessages);
513+
514+
const turnAttributes: Attributes = {
515+
"turn.number": turn + 1,
516+
"gen_ai.conversation.id": currentWirePayload.chatId,
517+
"gen_ai.operation.name": "chat",
518+
"chat.trigger": currentWirePayload.trigger,
519+
[SemanticInternalAttributes.STYLE_ICON]: "tabler-message-chatbot",
520+
[SemanticInternalAttributes.ENTITY_TYPE]: "chat-turn",
521+
};
522+
523+
if (lastUserMessage) {
524+
turnAttributes["chat.user_message"] = lastUserMessage;
525+
526+
// Show a truncated preview of the user message as an accessory
527+
const preview =
528+
lastUserMessage.length > 80
529+
? lastUserMessage.slice(0, 80) + "..."
530+
: lastUserMessage;
531+
Object.assign(
532+
turnAttributes,
533+
accessoryAttributes({
534+
items: [{ text: preview, variant: "normal" }],
535+
style: "codepath",
536+
})
537+
);
538+
}
495539

496-
// Per-turn stop controller (reset each turn)
497-
const stopController = new AbortController();
498-
currentStopController = stopController;
540+
if (wireMetadata !== undefined) {
541+
turnAttributes["chat.client_data"] =
542+
typeof wireMetadata === "string" ? wireMetadata : JSON.stringify(wireMetadata);
543+
}
499544

500-
// Three signals for the user's run function
501-
const stopSignal = stopController.signal;
502-
const cancelSignal = runSignal;
503-
const combinedSignal = AbortSignal.any([runSignal, stopController.signal]);
545+
const turnResult = await tracer.startActiveSpan(
546+
`chat turn ${turn + 1}`,
547+
async () => {
548+
_chatPipeCount = 0;
549+
550+
// Per-turn stop controller (reset each turn)
551+
const stopController = new AbortController();
552+
currentStopController = stopController;
553+
554+
// Three signals for the user's run function
555+
const stopSignal = stopController.signal;
556+
const cancelSignal = runSignal;
557+
const combinedSignal = AbortSignal.any([runSignal, stopController.signal]);
558+
559+
// Buffer messages that arrive during streaming
560+
const pendingMessages: ChatTaskWirePayload[] = [];
561+
const msgSub = messagesInput.on((msg) => {
562+
pendingMessages.push(msg);
563+
});
564+
565+
// Convert wire payload to user-facing payload
566+
const sanitized = sanitizeMessages(uiMessages);
567+
const modelMessages = await convertToModelMessages(sanitized);
568+
569+
try {
570+
const result = await userRun({
571+
...restWire,
572+
messages: modelMessages,
573+
uiMessages: sanitized,
574+
clientData: wireMetadata,
575+
signal: combinedSignal,
576+
cancelSignal,
577+
stopSignal,
578+
});
579+
580+
// Auto-pipe if the run function returned a StreamTextResult or similar,
581+
// but only if pipeChat() wasn't already called manually during this turn
582+
if (_chatPipeCount === 0 && isUIMessageStreamable(result)) {
583+
await pipeChat(result, { signal: combinedSignal, spanName: "stream response" });
584+
}
585+
} catch (error) {
586+
// Handle AbortError from streamText gracefully
587+
if (error instanceof Error && error.name === "AbortError") {
588+
if (runSignal.aborted) {
589+
return "exit"; // Full run cancellation — exit
590+
}
591+
// Stop generation — fall through to continue the loop
592+
} else {
593+
throw error;
594+
}
595+
} finally {
596+
msgSub.off();
597+
}
504598

505-
// Buffer messages that arrive during streaming
506-
const pendingMessages: ChatTaskWirePayload[] = [];
507-
const msgSub = messagesInput.on((msg) => {
508-
pendingMessages.push(msg);
509-
});
599+
if (runSignal.aborted) return "exit";
510600

511-
// Convert wire payload to user-facing payload
512-
const { metadata: wireMetadata, messages: uiMessages, ...restWire } = currentWirePayload;
513-
const sanitized = sanitizeMessages(uiMessages);
514-
const modelMessages = await convertToModelMessages(sanitized);
515-
516-
try {
517-
const result = await userRun({
518-
...restWire,
519-
messages: modelMessages,
520-
uiMessages: sanitized,
521-
clientData: wireMetadata,
522-
signal: combinedSignal,
523-
cancelSignal,
524-
stopSignal,
525-
});
526-
527-
// Auto-pipe if the run function returned a StreamTextResult or similar,
528-
// but only if pipeChat() wasn't already called manually during this turn
529-
if (_chatPipeCount === 0 && isUIMessageStreamable(result)) {
530-
await pipeChat(result, { signal: combinedSignal });
531-
}
532-
} catch (error) {
533-
// Handle AbortError from streamText gracefully
534-
if (error instanceof Error && error.name === "AbortError") {
535-
if (runSignal.aborted) {
536-
return; // Full run cancellation — exit
601+
// Write turn-complete control chunk so frontend closes its stream
602+
await writeTurnCompleteChunk(currentWirePayload.chatId);
603+
604+
// If messages arrived during streaming, use the first one immediately
605+
if (pendingMessages.length > 0) {
606+
currentWirePayload = pendingMessages[0]!;
607+
return "continue";
537608
}
538-
// Stop generation — fall through to continue the loop
539-
} else {
540-
throw error;
541-
}
542-
} finally {
543-
msgSub.off();
544-
}
545609

546-
if (runSignal.aborted) return;
610+
// Phase 1: Keep the run warm for quick response to the next message.
611+
// The run stays active (using compute) during this window.
612+
if (warmTimeoutInSeconds > 0) {
613+
const warm = await messagesInput.once({
614+
timeoutMs: warmTimeoutInSeconds * 1000,
615+
spanName: "waiting (warm)",
616+
});
617+
618+
if (warm.ok) {
619+
// Message arrived while warm — respond instantly
620+
currentWirePayload = warm.output;
621+
return "continue";
622+
}
623+
}
547624

548-
// Write turn-complete control chunk so frontend closes its stream
549-
await writeTurnCompleteChunk();
625+
// Phase 2: Suspend the task (frees compute) until the next message arrives
626+
const next = await messagesInput.wait({
627+
timeout: turnTimeout,
628+
spanName: "waiting (suspended)",
629+
});
550630

551-
// If messages arrived during streaming, use the first one immediately
552-
if (pendingMessages.length > 0) {
553-
currentWirePayload = pendingMessages[0]!;
554-
continue;
555-
}
631+
if (!next.ok) {
632+
// Timed out waiting for the next message — end the conversation
633+
return "exit";
634+
}
556635

557-
// Phase 1: Keep the run warm for quick response to the next message.
558-
// The run stays active (using compute) during this window.
559-
if (warmTimeoutInSeconds > 0) {
560-
const warm = await messagesInput.once({
561-
timeoutMs: warmTimeoutInSeconds * 1000,
562-
});
563-
564-
if (warm.ok) {
565-
// Message arrived while warm — respond instantly
566-
currentWirePayload = warm.output;
567-
continue;
636+
currentWirePayload = next.output;
637+
return "continue";
638+
},
639+
{
640+
attributes: turnAttributes,
568641
}
569-
}
570-
571-
// Phase 2: Suspend the task (frees compute) until the next message arrives
572-
const next = await messagesInput.wait({ timeout: turnTimeout });
573-
574-
if (!next.ok) {
575-
// Timed out waiting for the next message — end the conversation
576-
return;
577-
}
642+
);
578643

579-
currentWirePayload = next.output;
644+
if (turnResult === "exit") return;
645+
// "continue" means proceed to next iteration
580646
}
581647
} finally {
582648
stopSub.off();
@@ -621,11 +687,39 @@ export const chat = {
621687
* The frontend transport intercepts this to close the ReadableStream for the current turn.
622688
* @internal
623689
*/
624-
async function writeTurnCompleteChunk(): Promise<void> {
690+
async function writeTurnCompleteChunk(chatId?: string): Promise<void> {
625691
const { waitUntilComplete } = streams.writer(CHAT_STREAM_KEY, {
692+
spanName: "turn complete",
693+
collapsed: true,
626694
execute: ({ write }) => {
627695
write({ type: "__trigger_turn_complete" });
628696
},
629697
});
630698
await waitUntilComplete();
631699
}
700+
701+
/**
702+
* Extracts the text content of the last user message from a UIMessage array.
703+
* Returns undefined if no user message is found.
704+
* @internal
705+
*/
706+
function extractLastUserMessageText(messages: UIMessage[]): string | undefined {
707+
for (let i = messages.length - 1; i >= 0; i--) {
708+
const msg = messages[i]!;
709+
if (msg.role !== "user") continue;
710+
711+
// UIMessage uses parts array
712+
if (msg.parts) {
713+
const textParts = msg.parts
714+
.filter((p: any) => p.type === "text" && p.text)
715+
.map((p: any) => p.text as string);
716+
if (textParts.length > 0) {
717+
return textParts.join("\n");
718+
}
719+
}
720+
721+
break;
722+
}
723+
724+
return undefined;
725+
}

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ function pipe<T>(
139139
opts = valueOrOptions as PipeStreamOptions | undefined;
140140
}
141141

142-
return pipeInternal(key, value, opts, "streams.pipe()");
142+
return pipeInternal(key, value, opts, opts?.spanName ?? "streams.pipe()");
143143
}
144144

145145
/**
@@ -167,6 +167,7 @@ function pipeInternal<T>(
167167
[SemanticInternalAttributes.ENTITY_TYPE]: "realtime-stream",
168168
[SemanticInternalAttributes.ENTITY_ID]: `${runId}:${key}`,
169169
[SemanticInternalAttributes.STYLE_ICON]: "streams",
170+
...(opts?.collapsed ? { [SemanticInternalAttributes.COLLAPSED]: true } : {}),
170171
...accessoryAttributes({
171172
items: [
172173
{
@@ -640,7 +641,7 @@ function writerInternal<TPart>(key: string, options: WriterStreamOptions<TPart>)
640641
}
641642
});
642643

643-
return pipeInternal(key, stream, options, "streams.writer()");
644+
return pipeInternal(key, stream, options, options.spanName ?? "streams.writer()");
644645
}
645646

646647
export type RealtimeDefineStreamOptions = {
@@ -713,7 +714,7 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
713714
return new InputStreamOncePromise<TData>((resolve, reject) => {
714715
tracer
715716
.startActiveSpan(
716-
`inputStream.once()`,
717+
options?.spanName ?? `inputStream.once()`,
717718
async () => {
718719
const result = await innerPromise;
719720
resolve(result as InputStreamOnceResult<TData>);
@@ -761,7 +762,7 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
761762
});
762763

763764
const result = await tracer.startActiveSpan(
764-
`inputStream.wait()`,
765+
options?.spanName ?? `inputStream.wait()`,
765766
async (span) => {
766767
// 1. Block the run on the waitpoint
767768
const waitResponse = await apiClient.waitForWaitpointToken({

0 commit comments

Comments
 (0)