Skip to content

Commit a2f7e17

Browse files
committed
🤖 fix: prevent RetryBarrier flashes during stream startup
1 parent 6fcca8b commit a2f7e17

File tree

15 files changed

+168
-39
lines changed

15 files changed

+168
-39
lines changed

src/browser/stores/WorkspaceStore.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,15 @@ export class WorkspaceStore {
161161
data: WorkspaceChatMessage
162162
) => void
163163
> = {
164+
"stream-pending": (workspaceId, aggregator, data) => {
165+
aggregator.handleStreamPending(data as never);
166+
if (this.onModelUsed) {
167+
this.onModelUsed((data as { model: string }).model);
168+
}
169+
this.states.bump(workspaceId);
170+
// Bump usage store so liveUsage can show the current model even before streaming starts
171+
this.usageStore.bump(workspaceId);
172+
},
164173
"stream-start": (workspaceId, aggregator, data) => {
165174
aggregator.handleStreamStart(data as never);
166175
if (this.onModelUsed) {
@@ -480,7 +489,7 @@ export class WorkspaceStore {
480489
name: metadata?.name ?? workspaceId, // Fall back to ID if metadata missing
481490
messages: aggregator.getDisplayedMessages(),
482491
queuedMessage: this.queuedMessages.get(workspaceId) ?? null,
483-
canInterrupt: activeStreams.length > 0,
492+
canInterrupt: activeStreams.length > 0 || aggregator.hasConnectingStreams(),
484493
isCompacting: aggregator.isCompacting(),
485494
awaitingUserQuestion: aggregator.hasAwaitingUserQuestion(),
486495
loading: !hasMessages && !isCaughtUp,
@@ -965,7 +974,8 @@ export class WorkspaceStore {
965974
// Check if there's an active stream in buffered events (reconnection scenario)
966975
const pendingEvents = this.pendingStreamEvents.get(workspaceId) ?? [];
967976
const hasActiveStream = pendingEvents.some(
968-
(event) => "type" in event && event.type === "stream-start"
977+
(event) =>
978+
"type" in event && (event.type === "stream-start" || event.type === "stream-pending")
969979
);
970980

971981
// Load historical messages first

src/browser/utils/messages/StreamingMessageAggregator.ts

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
} from "@/common/types/message";
88
import { createMuxMessage } from "@/common/types/message";
99
import type {
10+
StreamPendingEvent,
1011
StreamStartEvent,
1112
StreamDeltaEvent,
1213
UsageDeltaEvent,
@@ -75,6 +76,9 @@ function hasFailureResult(result: unknown): boolean {
7576
}
7677

7778
export class StreamingMessageAggregator {
79+
// Streams that have been registered/started in the backend but haven't emitted stream-start yet.
80+
// This is the "connecting" phase: abort should work, but no deltas have started.
81+
private connectingStreams = new Map<string, { startTime: number; model: string }>();
7882
private messages = new Map<string, MuxMessage>();
7983
private activeStreams = new Map<string, StreamingContext>();
8084

@@ -283,6 +287,7 @@ export class StreamingMessageAggregator {
283287
*/
284288
private cleanupStreamState(messageId: string): void {
285289
this.activeStreams.delete(messageId);
290+
this.connectingStreams.delete(messageId);
286291
// Clear todos when stream ends - they're stream-scoped state
287292
// On reload, todos will be reconstructed from completed tool_write calls in history
288293
this.currentTodos = [];
@@ -391,6 +396,9 @@ export class StreamingMessageAggregator {
391396
this.pendingStreamStartTime = time;
392397
}
393398

399+
hasConnectingStreams(): boolean {
400+
return this.connectingStreams.size > 0;
401+
}
394402
getActiveStreams(): StreamingContext[] {
395403
return Array.from(this.activeStreams.values());
396404
}
@@ -418,6 +426,11 @@ export class StreamingMessageAggregator {
418426
return context.model;
419427
}
420428

429+
// If we're connecting (stream-pending), return that model
430+
for (const context of this.connectingStreams.values()) {
431+
return context.model;
432+
}
433+
421434
// Otherwise, return the model from the most recent assistant message
422435
const messages = this.getAllMessages();
423436
for (let i = messages.length - 1; i >= 0; i--) {
@@ -437,6 +450,7 @@ export class StreamingMessageAggregator {
437450
clear(): void {
438451
this.messages.clear();
439452
this.activeStreams.clear();
453+
this.connectingStreams.clear();
440454
this.invalidateCache();
441455
}
442456

@@ -459,9 +473,30 @@ export class StreamingMessageAggregator {
459473
}
460474

461475
// Unified event handlers that encapsulate all complex logic
476+
handleStreamPending(data: StreamPendingEvent): void {
477+
// Clear pending stream start timestamp - backend has accepted the request.
478+
this.setPendingStreamStartTime(null);
479+
480+
this.connectingStreams.set(data.messageId, { startTime: Date.now(), model: data.model });
481+
482+
// Create a placeholder assistant message (kept invisible until parts arrive)
483+
// so that out-of-order deltas (if they ever occur) have somewhere to attach.
484+
if (!this.messages.has(data.messageId)) {
485+
const connectingMessage = createMuxMessage(data.messageId, "assistant", "", {
486+
historySequence: data.historySequence,
487+
timestamp: Date.now(),
488+
model: data.model,
489+
});
490+
this.messages.set(data.messageId, connectingMessage);
491+
}
492+
493+
this.invalidateCache();
494+
}
495+
462496
handleStreamStart(data: StreamStartEvent): void {
463-
// Clear pending stream start timestamp - stream has started
497+
// Clear pending/connecting state - stream has started.
464498
this.setPendingStreamStartTime(null);
499+
this.connectingStreams.delete(data.messageId);
465500

466501
// NOTE: We do NOT clear agentStatus or currentTodos here.
467502
// They are cleared when a new user message arrives (see handleMessage),
@@ -596,10 +631,10 @@ export class StreamingMessageAggregator {
596631
}
597632

598633
handleStreamError(data: StreamErrorMessage): void {
599-
// Direct lookup by messageId
600-
const activeStream = this.activeStreams.get(data.messageId);
634+
const isTrackedStream =
635+
this.activeStreams.has(data.messageId) || this.connectingStreams.has(data.messageId);
601636

602-
if (activeStream) {
637+
if (isTrackedStream) {
603638
// Mark the message with error metadata
604639
const message = this.messages.get(data.messageId);
605640
if (message?.metadata) {
@@ -608,32 +643,33 @@ export class StreamingMessageAggregator {
608643
message.metadata.errorType = data.errorType;
609644
}
610645

611-
// Clean up stream-scoped state (active stream tracking, TODOs)
646+
// Clean up stream-scoped state (active/connecting tracking, TODOs)
612647
this.cleanupStreamState(data.messageId);
613648
this.invalidateCache();
614-
} else {
615-
// Pre-stream error (e.g., API key not configured before streaming starts)
616-
// Create a synthetic error message since there's no active stream to attach to
617-
// Get the highest historySequence from existing messages so this appears at the end
618-
const maxSequence = Math.max(
619-
0,
620-
...Array.from(this.messages.values()).map((m) => m.metadata?.historySequence ?? 0)
621-
);
622-
const errorMessage: MuxMessage = {
623-
id: data.messageId,
624-
role: "assistant",
625-
parts: [],
626-
metadata: {
627-
partial: true,
628-
error: data.error,
629-
errorType: data.errorType,
630-
timestamp: Date.now(),
631-
historySequence: maxSequence + 1,
632-
},
633-
};
634-
this.messages.set(data.messageId, errorMessage);
635-
this.invalidateCache();
649+
return;
636650
}
651+
652+
// Pre-stream error (e.g., API key not configured before streaming starts)
653+
// Create a synthetic error message since there's no tracked stream to attach to.
654+
// Get the highest historySequence from existing messages so this appears at the end.
655+
const maxSequence = Math.max(
656+
0,
657+
...Array.from(this.messages.values()).map((m) => m.metadata?.historySequence ?? 0)
658+
);
659+
const errorMessage: MuxMessage = {
660+
id: data.messageId,
661+
role: "assistant",
662+
parts: [],
663+
metadata: {
664+
partial: true,
665+
error: data.error,
666+
errorType: data.errorType,
667+
timestamp: Date.now(),
668+
historySequence: maxSequence + 1,
669+
},
670+
};
671+
this.messages.set(data.messageId, errorMessage);
672+
this.invalidateCache();
637673
}
638674

639675
handleToolCallStart(data: ToolCallStartEvent): void {

src/browser/utils/messages/retryEligibility.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,35 @@ describe("hasInterruptedStream", () => {
213213
expect(hasInterruptedStream(messages, null)).toBe(true);
214214
});
215215

216+
it("returns false when pendingStreamStartTime is null but last user message timestamp is recent (replay/reload)", () => {
217+
const justSentTimestamp = Date.now() - (PENDING_STREAM_START_GRACE_PERIOD_MS - 500);
218+
const messages: DisplayedMessage[] = [
219+
{
220+
type: "user",
221+
id: "user-1",
222+
historyId: "user-1",
223+
content: "Hello",
224+
historySequence: 1,
225+
timestamp: justSentTimestamp,
226+
},
227+
];
228+
expect(hasInterruptedStream(messages, null)).toBe(false);
229+
});
230+
231+
it("returns true when pendingStreamStartTime is null and last user message timestamp is old (replay/reload)", () => {
232+
const longAgoTimestamp = Date.now() - (PENDING_STREAM_START_GRACE_PERIOD_MS + 1000);
233+
const messages: DisplayedMessage[] = [
234+
{
235+
type: "user",
236+
id: "user-1",
237+
historyId: "user-1",
238+
content: "Hello",
239+
historySequence: 1,
240+
timestamp: longAgoTimestamp,
241+
},
242+
];
243+
expect(hasInterruptedStream(messages, null)).toBe(true);
244+
});
216245
it("returns false when user message just sent (within grace period)", () => {
217246
const messages: DisplayedMessage[] = [
218247
{

src/browser/utils/messages/retryEligibility.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,22 @@ export function hasInterruptedStream(
8080
): boolean {
8181
if (messages.length === 0) return false;
8282

83-
// Don't show retry barrier if user message was sent very recently (within the grace period)
84-
// This prevents flash during normal send flow while stream-start event arrives
85-
// After the grace period, assume something is wrong and show the barrier
86-
if (pendingStreamStartTime !== null) {
87-
const elapsed = Date.now() - pendingStreamStartTime;
83+
const lastMessage = messages[messages.length - 1];
84+
85+
// Don't show retry barrier if the last user message was sent very recently (within the grace period).
86+
//
87+
// We prefer the explicit pendingStreamStartTime (set during the live send flow).
88+
// But during history replay / app reload, pendingStreamStartTime can be null even when the last
89+
// message is a fresh user message. In that case, fall back to the user message timestamp.
90+
const graceStartTime =
91+
pendingStreamStartTime ??
92+
(lastMessage.type === "user" ? (lastMessage.timestamp ?? null) : null);
93+
94+
if (graceStartTime !== null) {
95+
const elapsed = Date.now() - graceStartTime;
8896
if (elapsed < PENDING_STREAM_START_GRACE_PERIOD_MS) return false;
8997
}
9098

91-
const lastMessage = messages[messages.length - 1];
92-
9399
return (
94100
lastMessage.type === "stream-error" || // Stream errored out (show UI for ALL error types)
95101
lastMessage.type === "user" || // No response received yet (app restart during slow model)

src/common/orpc/schemas.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ export {
8888
StreamDeltaEventSchema,
8989
StreamEndEventSchema,
9090
StreamErrorMessageSchema,
91+
StreamPendingEventSchema,
9192
StreamStartEventSchema,
9293
ToolCallDeltaEventSchema,
9394
ToolCallEndEventSchema,

src/common/orpc/schemas/stream.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@ export const DeleteMessageSchema = z.object({
2727
historySequences: z.array(z.number()),
2828
});
2929

30+
// Emitted when a stream has been registered and is abortable, but before streaming begins.
31+
// This prevents RetryBarrier flash during slow provider connection/setup.
32+
export const StreamPendingEventSchema = z.object({
33+
type: z.literal("stream-pending"),
34+
workspaceId: z.string(),
35+
messageId: z.string(),
36+
model: z.string(),
37+
historySequence: z.number().meta({
38+
description: "Backend assigns global message ordering",
39+
}),
40+
});
3041
export const StreamStartEventSchema = z.object({
3142
type: z.literal("stream-start"),
3243
workspaceId: z.string(),
@@ -261,6 +272,7 @@ export const WorkspaceChatMessageSchema = z.discriminatedUnion("type", [
261272
CaughtUpMessageSchema,
262273
StreamErrorMessageSchema,
263274
DeleteMessageSchema,
275+
StreamPendingEventSchema,
264276
StreamStartEventSchema,
265277
StreamDeltaEventSchema,
266278
StreamEndEventSchema,

src/common/orpc/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { z } from "zod";
22
import type * as schemas from "./schemas";
33

44
import type {
5+
StreamPendingEvent,
56
StreamStartEvent,
67
StreamDeltaEvent,
78
StreamEndEvent,
@@ -43,6 +44,10 @@ export function isStreamError(msg: WorkspaceChatMessage): msg is StreamErrorMess
4344
return (msg as { type?: string }).type === "stream-error";
4445
}
4546

47+
export function isStreamPending(msg: WorkspaceChatMessage): msg is StreamPendingEvent {
48+
return (msg as { type?: string }).type === "stream-pending";
49+
}
50+
4651
export function isDeleteMessage(msg: WorkspaceChatMessage): msg is DeleteMessage {
4752
return (msg as { type?: string }).type === "delete";
4853
}

src/common/types/stream.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type {
1111
StreamAbortEventSchema,
1212
StreamDeltaEventSchema,
1313
StreamEndEventSchema,
14+
StreamPendingEventSchema,
1415
StreamStartEventSchema,
1516
ToolCallDeltaEventSchema,
1617
ToolCallEndEventSchema,
@@ -22,6 +23,7 @@ import type {
2223
* Completed message part (reasoning, text, or tool) suitable for serialization
2324
* Used in StreamEndEvent and partial message storage
2425
*/
26+
export type StreamPendingEvent = z.infer<typeof StreamPendingEventSchema>;
2527
export type CompletedMessagePart = MuxReasoningPart | MuxTextPart | MuxToolPart;
2628

2729
export type StreamStartEvent = z.infer<typeof StreamStartEventSchema>;
@@ -45,6 +47,7 @@ export type ReasoningEndEvent = z.infer<typeof ReasoningEndEventSchema>;
4547
export type UsageDeltaEvent = z.infer<typeof UsageDeltaEventSchema>;
4648

4749
export type AIServiceEvent =
50+
| StreamPendingEvent
4851
| StreamStartEvent
4952
| StreamDeltaEvent
5053
| StreamEndEvent

src/node/services/agentSession.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ export class AgentSession {
619619
this.aiService.on(event, wrapped as never);
620620
};
621621

622+
forward("stream-pending", (payload) => this.emitChatEvent(payload));
622623
forward("stream-start", (payload) => this.emitChatEvent(payload));
623624
forward("stream-delta", (payload) => this.emitChatEvent(payload));
624625
forward("tool-call-start", (payload) => this.emitChatEvent(payload));

src/node/services/aiService.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ export class AIService extends EventEmitter {
348348
* Forward all stream events from StreamManager to AIService consumers
349349
*/
350350
private setupStreamEventForwarding(): void {
351+
this.streamManager.on("stream-pending", (data) => this.emit("stream-pending", data));
351352
this.streamManager.on("stream-start", (data) => this.emit("stream-start", data));
352353
this.streamManager.on("stream-delta", (data) => this.emit("stream-delta", data));
353354
this.streamManager.on("stream-end", (data) => this.emit("stream-end", data));

0 commit comments

Comments
 (0)