Skip to content

Commit 1062756

Browse files
committed
🤖 fix: prevent RetryBarrier flashes during stream startup
1 parent f5aef11 commit 1062756

File tree

15 files changed

+168
-39
lines changed

15 files changed

+168
-39
lines changed

src/browser/stores/WorkspaceStore.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,15 @@ export class WorkspaceStore {
159159
data: WorkspaceChatMessage
160160
) => void
161161
> = {
162+
"stream-pending": (workspaceId, aggregator, data) => {
163+
aggregator.handleStreamPending(data as never);
164+
if (this.onModelUsed) {
165+
this.onModelUsed((data as { model: string }).model);
166+
}
167+
this.states.bump(workspaceId);
168+
// Bump usage store so liveUsage can show the current model even before streaming starts
169+
this.usageStore.bump(workspaceId);
170+
},
162171
"stream-start": (workspaceId, aggregator, data) => {
163172
aggregator.handleStreamStart(data as never);
164173
if (this.onModelUsed) {
@@ -478,7 +487,7 @@ export class WorkspaceStore {
478487
name: metadata?.name ?? workspaceId, // Fall back to ID if metadata missing
479488
messages: aggregator.getDisplayedMessages(),
480489
queuedMessage: this.queuedMessages.get(workspaceId) ?? null,
481-
canInterrupt: activeStreams.length > 0,
490+
canInterrupt: activeStreams.length > 0 || aggregator.hasConnectingStreams(),
482491
isCompacting: aggregator.isCompacting(),
483492
loading: !hasMessages && !isCaughtUp,
484493
muxMessages: messages,
@@ -960,7 +969,8 @@ export class WorkspaceStore {
960969
// Check if there's an active stream in buffered events (reconnection scenario)
961970
const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? [];
962971
const hasActiveStream = pendingEvents.some(
963-
(event) => "type" in event && event.type === "stream-start"
972+
(event) =>
973+
"type" in event && (event.type === "stream-start" || event.type === "stream-pending")
964974
);
965975

966976
// Load historical messages first

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
} from "@/common/types/message";
88
import { createMuxMessage } from "@/common/types/message";
99
import type {
10+
StreamPendingEvent,
1011
StreamStartEvent,
1112
StreamDeltaEvent,
1213
UsageDeltaEvent,
@@ -75,6 +76,9 @@ function hasFailureResult(result: unknown): boolean {
7576
}
7677

7778
export class StreamingMessageAggregator {
79+
// Streams that have been registered/started in the backend but haven't emitted stream-start yet.
80+
// This is the "connecting" phase: abort should work, but no deltas have started.
81+
private connectingStreams = new Map<string, { startTime: number; model: string }>();
7882
private messages = new Map<string, MuxMessage>();
7983
private activeStreams = new Map<string, StreamingContext>();
8084

@@ -264,6 +268,7 @@ export class StreamingMessageAggregator {
264268
*/
265269
private cleanupStreamState(messageId: string): void {
266270
this.activeStreams.delete(messageId);
271+
this.connectingStreams.delete(messageId);
267272
// Clear todos when stream ends - they're stream-scoped state
268273
// On reload, todos will be reconstructed from completed tool_write calls in history
269274
this.currentTodos = [];
@@ -372,6 +377,9 @@ export class StreamingMessageAggregator {
372377
this.pendingStreamStartTime = time;
373378
}
374379

380+
hasConnectingStreams(): boolean {
381+
return this.connectingStreams.size > 0;
382+
}
375383
getActiveStreams(): StreamingContext[] {
376384
return Array.from(this.activeStreams.values());
377385
}
@@ -399,6 +407,11 @@ export class StreamingMessageAggregator {
399407
return context.model;
400408
}
401409

410+
// If we're connecting (stream-pending), return that model
411+
for (const context of this.connectingStreams.values()) {
412+
return context.model;
413+
}
414+
402415
// Otherwise, return the model from the most recent assistant message
403416
const messages = this.getAllMessages();
404417
for (let i = messages.length - 1; i >= 0; i--) {
@@ -418,6 +431,7 @@ export class StreamingMessageAggregator {
418431
clear(): void {
419432
this.messages.clear();
420433
this.activeStreams.clear();
434+
this.connectingStreams.clear();
421435
this.invalidateCache();
422436
}
423437

@@ -440,9 +454,30 @@ export class StreamingMessageAggregator {
440454
}
441455

442456
// Unified event handlers that encapsulate all complex logic
457+
handleStreamPending(data: StreamPendingEvent): void {
458+
// Clear pending stream start timestamp - backend has accepted the request.
459+
this.setPendingStreamStartTime(null);
460+
461+
this.connectingStreams.set(data.messageId, { startTime: Date.now(), model: data.model });
462+
463+
// Create a placeholder assistant message (kept invisible until parts arrive)
464+
// so that out-of-order deltas (if they ever occur) have somewhere to attach.
465+
if (!this.messages.has(data.messageId)) {
466+
const connectingMessage = createMuxMessage(data.messageId, "assistant", "", {
467+
historySequence: data.historySequence,
468+
timestamp: Date.now(),
469+
model: data.model,
470+
});
471+
this.messages.set(data.messageId, connectingMessage);
472+
}
473+
474+
this.invalidateCache();
475+
}
476+
443477
handleStreamStart(data: StreamStartEvent): void {
444-
// Clear pending stream start timestamp - stream has started
478+
// Clear pending/connecting state - stream has started.
445479
this.setPendingStreamStartTime(null);
480+
this.connectingStreams.delete(data.messageId);
446481

447482
// NOTE: We do NOT clear agentStatus or currentTodos here.
448483
// They are cleared when a new user message arrives (see handleMessage),
@@ -577,10 +612,10 @@ export class StreamingMessageAggregator {
577612
}
578613

579614
handleStreamError(data: StreamErrorMessage): void {
580-
// Direct lookup by messageId
581-
const activeStream = this.activeStreams.get(data.messageId);
615+
const isTrackedStream =
616+
this.activeStreams.has(data.messageId) || this.connectingStreams.has(data.messageId);
582617

583-
if (activeStream) {
618+
if (isTrackedStream) {
584619
// Mark the message with error metadata
585620
const message = this.messages.get(data.messageId);
586621
if (message?.metadata) {
@@ -589,32 +624,33 @@ export class StreamingMessageAggregator {
589624
message.metadata.errorType = data.errorType;
590625
}
591626

592-
// Clean up stream-scoped state (active stream tracking, TODOs)
627+
// Clean up stream-scoped state (active/connecting tracking, TODOs)
593628
this.cleanupStreamState(data.messageId);
594629
this.invalidateCache();
595-
} else {
596-
// Pre-stream error (e.g., API key not configured before streaming starts)
597-
// Create a synthetic error message since there's no active stream to attach to
598-
// Get the highest historySequence from existing messages so this appears at the end
599-
const maxSequence = Math.max(
600-
0,
601-
...Array.from(this.messages.values()).map((m) => m.metadata?.historySequence ?? 0)
602-
);
603-
const errorMessage: MuxMessage = {
604-
id: data.messageId,
605-
role: "assistant",
606-
parts: [],
607-
metadata: {
608-
partial: true,
609-
error: data.error,
610-
errorType: data.errorType,
611-
timestamp: Date.now(),
612-
historySequence: maxSequence + 1,
613-
},
614-
};
615-
this.messages.set(data.messageId, errorMessage);
616-
this.invalidateCache();
630+
return;
617631
}
632+
633+
// Pre-stream error (e.g., API key not configured before streaming starts)
634+
// Create a synthetic error message since there's no tracked stream to attach to.
635+
// Get the highest historySequence from existing messages so this appears at the end.
636+
const maxSequence = Math.max(
637+
0,
638+
...Array.from(this.messages.values()).map((m) => m.metadata?.historySequence ?? 0)
639+
);
640+
const errorMessage: MuxMessage = {
641+
id: data.messageId,
642+
role: "assistant",
643+
parts: [],
644+
metadata: {
645+
partial: true,
646+
error: data.error,
647+
errorType: data.errorType,
648+
timestamp: Date.now(),
649+
historySequence: maxSequence + 1,
650+
},
651+
};
652+
this.messages.set(data.messageId, errorMessage);
653+
this.invalidateCache();
618654
}
619655

620656
handleToolCallStart(data: ToolCallStartEvent): void {

src/browser/utils/messages/retryEligibility.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,35 @@ describe("hasInterruptedStream", () => {
213213
expect(hasInterruptedStream(messages, null)).toBe(true);
214214
});
215215

216+
it("returns false when pendingStreamStartTime is null but last user message timestamp is recent (replay/reload)", () => {
217+
const justSentTimestamp = Date.now() - (PENDING_STREAM_START_GRACE_PERIOD_MS - 500);
218+
const messages: DisplayedMessage[] = [
219+
{
220+
type: "user",
221+
id: "user-1",
222+
historyId: "user-1",
223+
content: "Hello",
224+
historySequence: 1,
225+
timestamp: justSentTimestamp,
226+
},
227+
];
228+
expect(hasInterruptedStream(messages, null)).toBe(false);
229+
});
230+
231+
it("returns true when pendingStreamStartTime is null and last user message timestamp is old (replay/reload)", () => {
232+
const longAgoTimestamp = Date.now() - (PENDING_STREAM_START_GRACE_PERIOD_MS + 1000);
233+
const messages: DisplayedMessage[] = [
234+
{
235+
type: "user",
236+
id: "user-1",
237+
historyId: "user-1",
238+
content: "Hello",
239+
historySequence: 1,
240+
timestamp: longAgoTimestamp,
241+
},
242+
];
243+
expect(hasInterruptedStream(messages, null)).toBe(true);
244+
});
216245
it("returns false when user message just sent (within grace period)", () => {
217246
const messages: DisplayedMessage[] = [
218247
{

src/browser/utils/messages/retryEligibility.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,22 @@ export function hasInterruptedStream(
8080
): boolean {
8181
if (messages.length === 0) return false;
8282

83-
// Don't show retry barrier if user message was sent very recently (within the grace period)
84-
// This prevents flash during normal send flow while stream-start event arrives
85-
// After the grace period, assume something is wrong and show the barrier
86-
if (pendingStreamStartTime !== null) {
87-
const elapsed = Date.now() - pendingStreamStartTime;
83+
const lastMessage = messages[messages.length - 1];
84+
85+
// Don't show retry barrier if the last user message was sent very recently (within the grace period).
86+
//
87+
// We prefer the explicit pendingStreamStartTime (set during the live send flow).
88+
// But during history replay / app reload, pendingStreamStartTime can be null even when the last
89+
// message is a fresh user message. In that case, fall back to the user message timestamp.
90+
const graceStartTime =
91+
pendingStreamStartTime ??
92+
(lastMessage.type === "user" ? (lastMessage.timestamp ?? null) : null);
93+
94+
if (graceStartTime !== null) {
95+
const elapsed = Date.now() - graceStartTime;
8896
if (elapsed < PENDING_STREAM_START_GRACE_PERIOD_MS) return false;
8997
}
9098

91-
const lastMessage = messages[messages.length - 1];
92-
9399
return (
94100
lastMessage.type === "stream-error" || // Stream errored out (show UI for ALL error types)
95101
lastMessage.type === "user" || // No response received yet (app restart during slow model)

src/common/orpc/schemas.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ export {
8888
StreamDeltaEventSchema,
8989
StreamEndEventSchema,
9090
StreamErrorMessageSchema,
91+
StreamPendingEventSchema,
9192
StreamStartEventSchema,
9293
ToolCallDeltaEventSchema,
9394
ToolCallEndEventSchema,

src/common/orpc/schemas/stream.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@ export const DeleteMessageSchema = z.object({
2727
historySequences: z.array(z.number()),
2828
});
2929

30+
// Emitted when a stream has been registered and is abortable, but before streaming begins.
31+
// This prevents RetryBarrier flash during slow provider connection/setup.
32+
export const StreamPendingEventSchema = z.object({
33+
type: z.literal("stream-pending"),
34+
workspaceId: z.string(),
35+
messageId: z.string(),
36+
model: z.string(),
37+
historySequence: z.number().meta({
38+
description: "Backend assigns global message ordering",
39+
}),
40+
});
3041
export const StreamStartEventSchema = z.object({
3142
type: z.literal("stream-start"),
3243
workspaceId: z.string(),
@@ -261,6 +272,7 @@ export const WorkspaceChatMessageSchema = z.discriminatedUnion("type", [
261272
CaughtUpMessageSchema,
262273
StreamErrorMessageSchema,
263274
DeleteMessageSchema,
275+
StreamPendingEventSchema,
264276
StreamStartEventSchema,
265277
StreamDeltaEventSchema,
266278
StreamEndEventSchema,

src/common/orpc/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { z } from "zod";
22
import type * as schemas from "./schemas";
33

44
import type {
5+
StreamPendingEvent,
56
StreamStartEvent,
67
StreamDeltaEvent,
78
StreamEndEvent,
@@ -43,6 +44,10 @@ export function isStreamError(msg: WorkspaceChatMessage): msg is StreamErrorMess
4344
return (msg as { type?: string }).type === "stream-error";
4445
}
4546

47+
export function isStreamPending(msg: WorkspaceChatMessage): msg is StreamPendingEvent {
48+
return (msg as { type?: string }).type === "stream-pending";
49+
}
50+
4651
export function isDeleteMessage(msg: WorkspaceChatMessage): msg is DeleteMessage {
4752
return (msg as { type?: string }).type === "delete";
4853
}

src/common/types/stream.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type {
1111
StreamAbortEventSchema,
1212
StreamDeltaEventSchema,
1313
StreamEndEventSchema,
14+
StreamPendingEventSchema,
1415
StreamStartEventSchema,
1516
ToolCallDeltaEventSchema,
1617
ToolCallEndEventSchema,
@@ -22,6 +23,7 @@ import type {
2223
* Completed message part (reasoning, text, or tool) suitable for serialization
2324
* Used in StreamEndEvent and partial message storage
2425
*/
26+
export type StreamPendingEvent = z.infer<typeof StreamPendingEventSchema>;
2527
export type CompletedMessagePart = MuxReasoningPart | MuxTextPart | MuxToolPart;
2628

2729
export type StreamStartEvent = z.infer<typeof StreamStartEventSchema>;
@@ -45,6 +47,7 @@ export type ReasoningEndEvent = z.infer<typeof ReasoningEndEventSchema>;
4547
export type UsageDeltaEvent = z.infer<typeof UsageDeltaEventSchema>;
4648

4749
export type AIServiceEvent =
50+
| StreamPendingEvent
4851
| StreamStartEvent
4952
| StreamDeltaEvent
5053
| StreamEndEvent

src/node/services/agentSession.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ export class AgentSession {
619619
this.aiService.on(event, wrapped as never);
620620
};
621621

622+
forward("stream-pending", (payload) => this.emitChatEvent(payload));
622623
forward("stream-start", (payload) => this.emitChatEvent(payload));
623624
forward("stream-delta", (payload) => this.emitChatEvent(payload));
624625
forward("tool-call-start", (payload) => this.emitChatEvent(payload));

src/node/services/aiService.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ export class AIService extends EventEmitter {
318318
* Forward all stream events from StreamManager to AIService consumers
319319
*/
320320
private setupStreamEventForwarding(): void {
321+
this.streamManager.on("stream-pending", (data) => this.emit("stream-pending", data));
321322
this.streamManager.on("stream-start", (data) => this.emit("stream-start", data));
322323
this.streamManager.on("stream-delta", (data) => this.emit("stream-delta", data));
323324
this.streamManager.on("stream-end", (data) => this.emit("stream-end", data));

0 commit comments

Comments
 (0)