Skip to content

Commit b57993e

Browse files
committed
The stream.read() span now works better when specifying a startIndex
1 parent b3dfc5e commit b57993e

File tree

8 files changed

+50
-15
lines changed

8 files changed

+50
-15
lines changed

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { WaitpointPresenter } from "./WaitpointPresenter.server";
1919
import { engine } from "~/v3/runEngine.server";
2020
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
2121
import { IEventRepository, SpanDetail } from "~/v3/eventRepository/eventRepository.types";
22+
import { safeJsonParse } from "~/utils/json";
2223

2324
type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
2425
export type Span = NonNullable<NonNullable<Result>["span"]>;
@@ -570,13 +571,18 @@ export class SpanPresenter extends BasePresenter {
570571
return { ...data, entity: null };
571572
}
572573

574+
const metadata = span.entity.metadata
575+
? (safeJsonParse(span.entity.metadata) as Record<string, unknown> | undefined)
576+
: undefined;
577+
573578
return {
574579
...data,
575580
entity: {
576581
type: "realtime-stream" as const,
577582
object: {
578583
runId,
579584
streamKey,
585+
metadata,
580586
},
581587
},
582588
};

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,6 +1161,7 @@ function SpanEntity({ span }: { span: Span }) {
11611161
<RealtimeStreamViewer
11621162
runId={span.entity.object.runId}
11631163
streamKey={span.entity.object.streamKey}
1164+
metadata={span.entity.object.metadata}
11641165
/>
11651166
</div>
11661167
);

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,23 +71,28 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
7171
run.realtimeStreamsVersion
7272
);
7373

74-
return realtimeStream.streamResponse(
75-
request,
76-
run.friendlyId,
77-
streamKey,
78-
request.signal,
79-
lastEventId
80-
);
74+
return realtimeStream.streamResponse(request, run.friendlyId, streamKey, request.signal, {
75+
lastEventId,
76+
});
8177
};
8278

83-
export function RealtimeStreamViewer({ runId, streamKey }: { runId: string; streamKey: string }) {
79+
export function RealtimeStreamViewer({
80+
runId,
81+
streamKey,
82+
metadata,
83+
}: {
84+
runId: string;
85+
streamKey: string;
86+
metadata: Record<string, unknown> | undefined;
87+
}) {
8488
const organization = useOrganization();
8589
const project = useProject();
8690
const environment = useEnvironment();
8791

8892
const resourcePath = `/resources/orgs/${organization.slug}/projects/${project.slug}/env/${environment.slug}/runs/${runId}/streams/${streamKey}`;
8993

90-
const { chunks, error, isConnected } = useRealtimeStream(resourcePath);
94+
const startIndex = typeof metadata?.startIndex === "number" ? metadata.startIndex : undefined;
95+
const { chunks, error, isConnected } = useRealtimeStream(resourcePath, startIndex);
9196
const scrollRef = useRef<HTMLDivElement>(null);
9297
const bottomRef = useRef<HTMLDivElement>(null);
9398
const [isAtBottom, setIsAtBottom] = useState(true);
@@ -124,7 +129,10 @@ export function RealtimeStreamViewer({ runId, streamKey }: { runId: string; stre
124129
}
125130
}, [chunks, isAtBottom]);
126131

127-
const maxLineNumberWidth = chunks.length.toString().length;
132+
const firstLineNumber = startIndex ?? 0;
133+
const lastLineNumber = firstLineNumber + chunks.length - 1;
134+
const maxLineNumberWidth = (chunks.length > 0 ? lastLineNumber : firstLineNumber).toString()
135+
.length;
128136

129137
return (
130138
<div className="flex h-full flex-col overflow-hidden border-t border-grid-bright">
@@ -178,7 +186,7 @@ export function RealtimeStreamViewer({ runId, streamKey }: { runId: string; stre
178186
<StreamChunkLine
179187
key={index}
180188
chunk={chunk}
181-
lineNumber={index + 1}
189+
lineNumber={firstLineNumber + index}
182190
maxLineNumberWidth={maxLineNumberWidth}
183191
/>
184192
))}
@@ -246,7 +254,7 @@ function StreamChunkLine({
246254
);
247255
}
248256

249-
function useRealtimeStream(resourcePath: string) {
257+
function useRealtimeStream(resourcePath: string, startIndex?: number) {
250258
const [chunks, setChunks] = useState<StreamChunk[]>([]);
251259
const [error, setError] = useState<Error | null>(null);
252260
const [isConnected, setIsConnected] = useState(false);
@@ -259,6 +267,8 @@ function useRealtimeStream(resourcePath: string) {
259267
try {
260268
const sseSubscription = new SSEStreamSubscription(resourcePath, {
261269
signal: abortController.signal,
270+
lastEventId: startIndex ? (startIndex - 1).toString() : undefined,
271+
timeoutInSeconds: 30,
262272
});
263273

264274
const stream = await sseSubscription.subscribe();
@@ -300,7 +310,7 @@ function useRealtimeStream(resourcePath: string) {
300310
abortController.abort();
301311
reader?.cancel();
302312
};
303-
}, [resourcePath]);
313+
}, [resourcePath, startIndex]);
304314

305315
return { chunks, error, isConnected };
306316
}

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,19 +424,24 @@ export class ClickhouseEventRepository implements IEventRepository {
424424

425425
private extractEntityFromAttributes(
426426
attributes: Attributes
427-
): { entityType: string; entityId?: string } | undefined {
427+
): { entityType: string; entityId?: string; entityMetadata?: string } | undefined {
428428
if (!attributes || typeof attributes !== "object") {
429429
return undefined;
430430
}
431431

432432
const entityType = attributes[SemanticInternalAttributes.ENTITY_TYPE];
433433
const entityId = attributes[SemanticInternalAttributes.ENTITY_ID];
434+
const entityMetadata = attributes[SemanticInternalAttributes.ENTITY_METADATA];
434435

435436
if (typeof entityType !== "string") {
436437
return undefined;
437438
}
438439

439-
return { entityType, entityId: entityId as string | undefined };
440+
return {
441+
entityType,
442+
entityId: entityId as string | undefined,
443+
entityMetadata: entityMetadata as string | undefined,
444+
};
440445
}
441446

442447
private addToBatch(events: TaskEventV1Input[] | TaskEventV1Input) {
@@ -1101,6 +1106,7 @@ export class ClickhouseEventRepository implements IEventRepository {
11011106
entity: {
11021107
type: undefined,
11031108
id: undefined,
1109+
metadata: undefined,
11041110
},
11051111
metadata: {},
11061112
};
@@ -1140,6 +1146,12 @@ export class ClickhouseEventRepository implements IEventRepository {
11401146
span.entity = {
11411147
id: parsedMetadata.entity.entityId,
11421148
type: parsedMetadata.entity.entityType,
1149+
metadata:
1150+
"entityMetadata" in parsedMetadata.entity &&
1151+
parsedMetadata.entity.entityMetadata &&
1152+
typeof parsedMetadata.entity.entityMetadata === "string"
1153+
? parsedMetadata.entity.entityMetadata
1154+
: undefined,
11431155
};
11441156
}
11451157

apps/webapp/app/v3/eventRepository/eventRepository.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,7 @@ export class EventRepository implements IEventRepository {
783783
SemanticInternalAttributes.ENTITY_TYPE
784784
),
785785
id: rehydrateAttribute<string>(spanEvent.properties, SemanticInternalAttributes.ENTITY_ID),
786+
metadata: undefined,
786787
};
787788

788789
return {

apps/webapp/app/v3/eventRepository/eventRepository.types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ export type SpanDetail = {
217217
// Used for entity type switching in SpanEntity
218218
type: string | undefined;
219219
id: string | undefined;
220+
metadata: string | undefined;
220221
};
221222

222223
metadata: any; // Used by SpanPresenter for entity processing

packages/core/src/v3/semanticInternalAttributes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export const SemanticInternalAttributes = {
2929
SPAN: "$span",
3030
ENTITY_TYPE: "$entity.type",
3131
ENTITY_ID: "$entity.id",
32+
ENTITY_METADATA: "$entity.metadata",
3233
OUTPUT: "$output",
3334
OUTPUT_TYPE: "$mime_type_output",
3435
STYLE: "$style",

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ async function readStream<T>(
115115
runId,
116116
[SemanticInternalAttributes.ENTITY_TYPE]: "realtime-stream",
117117
[SemanticInternalAttributes.ENTITY_ID]: `${runId}:${key}`,
118+
[SemanticInternalAttributes.ENTITY_METADATA]: JSON.stringify({
119+
startIndex: options?.startIndex,
120+
}),
118121
[SemanticInternalAttributes.STYLE_ICON]: "streams",
119122
...accessoryAttributes({
120123
items: [

0 commit comments

Comments
 (0)