Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/web-ui/src/flow_chat/services/EventBatcher.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { afterEach, describe, expect, it } from 'vitest';
import { setIncludeSensitiveDiagnostics } from '@/shared/utils/logger';
import { getBatchedEventsLogPayload, summarizeBatchedEventsForLog, type BatchedEvent } from './EventBatcher';
import {
generateToolEventKey,
getBatchedEventsLogPayload,
summarizeBatchedEventsForLog,
type BatchedEvent,
type ToolEventData,
} from './EventBatcher';

describe('summarizeBatchedEventsForLog', () => {
afterEach(() => {
Expand Down Expand Up @@ -65,3 +71,24 @@ describe('summarizeBatchedEventsForLog', () => {
expect(summaryText).not.toContain('src/secret.ts');
});
});

describe('generateToolEventKey', () => {
it('accumulates Write params so argument deltas survive batching', () => {
const keyInfo = generateToolEventKey({
sessionId: 'session-1',
turnId: 'turn-1',
roundId: 'round-1',
toolEvent: {
event_type: 'ParamsPartial',
tool_id: 'tool-1',
tool_name: 'Write',
params: '{"file_path":"src/app.ts"',
},
} satisfies ToolEventData);

expect(keyInfo).toEqual({
key: 'tool:params:session-1:tool-1',
strategy: 'accumulate',
});
});
});
4 changes: 1 addition & 3 deletions src/web-ui/src/flow_chat/services/EventBatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,9 @@ export function generateToolEventKey(data: ToolEventData): { key: string; strate
}

if (eventType === 'ParamsPartial') {
const toolName = (toolEvent as any).tool_name || '';
const isWriteLike = ['write', 'write_notebook', 'file_write', 'Write'].includes(toolName);
return {
key: `tool:params:${sessionId}:${toolUseId}`,
strategy: isWriteLike ? 'replace' : 'accumulate'
strategy: 'accumulate'
};
}
if (eventType === 'Progress') {
Expand Down
72 changes: 71 additions & 1 deletion src/web-ui/src/flow_chat/services/FlowChatManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { FlowChatManager } from './FlowChatManager';
const storeMocks = vi.hoisted(() => ({
store: {} as any,
initializeEventListeners: vi.fn(),
eventBatchers: [] as Array<{
flushNow: ReturnType<typeof vi.fn>;
destroy: ReturnType<typeof vi.fn>;
}>,
}));

vi.mock('./ProcessingStatusManager', () => ({
Expand Down Expand Up @@ -32,7 +36,12 @@ vi.mock('../state-machine', () => ({

vi.mock('./EventBatcher', () => ({
EventBatcher: class {
constructor(private readonly options: { onFlush: (events: Array<{ key: string; payload: unknown }>) => void }) {}
public flushNow = vi.fn();
public destroy = vi.fn();

constructor(private readonly options: { onFlush: (events: Array<{ key: string; payload: unknown }>) => void }) {
storeMocks.eventBatchers.push(this);
}

flush(events: Array<{ key: string; payload: unknown }>): void {
this.options.onFlush(events);
Expand Down Expand Up @@ -104,9 +113,70 @@ describe('FlowChatManager initialization', () => {
beforeEach(() => {
(FlowChatManager as any).instance = undefined;
vi.clearAllMocks();
storeMocks.eventBatchers.length = 0;
storeMocks.initializeEventListeners.mockResolvedValue(() => {});
});

it('flushes and destroys the batcher when the singleton is disposed', () => {
storeMocks.store = {};

const manager = FlowChatManager.getInstance();
const batcher = storeMocks.eventBatchers[0];

FlowChatManager.disposeInstance();

expect(batcher.flushNow).toHaveBeenCalledTimes(1);
expect(batcher.destroy).toHaveBeenCalledTimes(1);
expect(batcher.flushNow.mock.invocationCallOrder[0]).toBeLessThan(
batcher.destroy.mock.invocationCallOrder[0],
);
});

it('runs listener cleanup if disposal wins the initialization race', async () => {
storeMocks.store = {};
const listenerInitialization = createDeferred<() => void>();
const cleanup = vi.fn();
storeMocks.initializeEventListeners.mockReturnValue(listenerInitialization.promise);

const manager = FlowChatManager.getInstance();
const initializeListeners = (manager as any).initializeEventListeners();

await flushAsyncWork();
manager.destroy();
expect(cleanup).not.toHaveBeenCalled();

listenerInitialization.resolve(cleanup);
await initializeListeners;

expect(cleanup).toHaveBeenCalledTimes(1);
expect((manager as any).eventListenerInitialized).toBe(false);
expect((manager as any).eventListenerCleanup).toBeNull();
});

it('stops workspace initialization if the manager is disposed while listeners initialize', async () => {
const listenerInitialization = createDeferred<() => void>();
storeMocks.initializeEventListeners.mockReturnValue(listenerInitialization.promise);
storeMocks.store = {
registerPersistUnreadCompletionCallback: vi.fn(),
loadSessionMetadataPage: vi.fn(async () => ({
sessions: [],
totalTopLevelCount: 0,
hasMore: false,
})),
};

const manager = FlowChatManager.getInstance();
const initialize = manager.initialize('D:/workspace/BitFun');

await flushAsyncWork();
manager.destroy();
listenerInitialization.resolve(vi.fn());

await expect(initialize).resolves.toBe(false);
expect(storeMocks.store.registerPersistUnreadCompletionCallback).not.toHaveBeenCalled();
expect(storeMocks.store.loadSessionMetadataPage).not.toHaveBeenCalled();
});

it('reuses concurrent initialization for the same workspace history restore', async () => {
const metadataLoad = createDeferred<{
sessions: unknown[];
Expand Down
60 changes: 58 additions & 2 deletions src/web-ui/src/flow_chat/services/FlowChatManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@
const log = createLogger('FlowChatManager');

export class FlowChatManager {
private static instance: FlowChatManager;
private static instance: FlowChatManager | null = null;
private context: FlowChatContext;
private agentService: AgentService;
private eventListenerInitialized = false;
private eventListenerInitializationPromise: Promise<void> | null = null;
private eventListenerCleanup: (() => void) | null = null;
private initializationRequests = new Map<string, Promise<boolean>>();
private latestInitializationRequestKey: string | null = null;
private disposed = false;

private constructor() {
this.context = {
Expand Down Expand Up @@ -96,12 +97,22 @@
return FlowChatManager.instance;
}

public static disposeInstance(): void {
FlowChatManager.instance?.destroy();
FlowChatManager.instance = null;
}

async initialize(
workspacePath: string,
preferredMode?: string,
remoteConnectionId?: string,
remoteSshHost?: string
): Promise<boolean> {
if (this.disposed) {
log.debug('Ignoring initialize call on disposed FlowChatManager', { workspacePath });
return false;
}

const requestKey = FlowChatManager.createInitializationRequestKey(
workspacePath,
preferredMode,
Expand All @@ -115,7 +126,7 @@
}

let request: Promise<boolean>;
request = this.initializeWorkspace(

Check warning on line 129 in src/web-ui/src/flow_chat/services/FlowChatManager.ts

View workflow job for this annotation

GitHub Actions / Frontend Build

'request' is never reassigned. Use 'const' instead
requestKey,
workspacePath,
preferredMode,
Expand Down Expand Up @@ -153,6 +164,9 @@
): Promise<boolean> {
try {
await this.initializeEventListeners();
if (this.disposed) {
return false;
}

// Register callback to persist unread completion changes to backend
this.context.flowChatStore.registerPersistUnreadCompletionCallback(
Expand All @@ -171,6 +185,9 @@
remoteSshHost,
'flow_chat_manager'
);
if (this.disposed) {
return false;
}

const sessionMatchesWorkspace = (session: {
workspacePath?: string;
Expand Down Expand Up @@ -207,6 +224,9 @@
remoteSshHost,
'flow_chat_manager_preferred_mode'
);
if (this.disposed) {
return false;
}
state = this.context.flowChatStore.getState();
workspaceSessions = Array.from(state.sessions.values()).filter(sessionMatchesWorkspace);
if (workspaceSessions.some(session => session.mode === preferredMode) || !nextPage.hasMore) {
Expand Down Expand Up @@ -251,6 +271,9 @@
latestSession.remoteSshHost,
{ deferFullHistoryUntilActive: true },
);
if (this.disposed) {
return false;
}
}

if (!isCurrentInitializationRequest()) {
Expand Down Expand Up @@ -289,6 +312,9 @@
}

private async initializeEventListeners(): Promise<void> {
if (this.disposed) {
return;
}
if (this.eventListenerInitialized) {
return;
}
Expand All @@ -297,11 +323,17 @@
}

this.eventListenerInitializationPromise = (async () => {
this.eventListenerCleanup = await initializeEventListeners(
const cleanup = await initializeEventListeners(
this.context,
(sessionId, turnId, result) => this.handleTodoWriteResult(sessionId, turnId, result)
);

if (this.disposed) {
cleanup();
return;
}

this.eventListenerCleanup = cleanup;
this.eventListenerInitialized = true;
})();

Expand All @@ -321,7 +353,24 @@
this.eventListenerInitializationPromise = null;
}

public destroy(): void {
if (this.disposed) {
return;
}

this.context.eventBatcher.flushNow();
this.disposed = true;
this.initializationRequests.clear();
this.latestInitializationRequestKey = null;
this.cleanupEventListeners();
this.context.eventBatcher.destroy();
}

private processBatchedEvents(events: Array<{ key: string; payload: any }>): void {
if (this.disposed) {
return;
}

processBatchedEvents(
this.context,
events,
Expand Down Expand Up @@ -680,5 +729,12 @@
}
}
}

if (import.meta.hot) {
import.meta.hot.dispose(() => {
FlowChatManager.disposeInstance();
});
}

export const flowChatManager = FlowChatManager.getInstance();
export default flowChatManager;
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,38 @@ describe('resolveDialogTurnDisplayContent', () => {
});
});

describe('mergeParamsPartialEventData', () => {
it('appends Write argument deltas within a batch', () => {
const merged = __test_only__.mergeParamsPartialEventData(
{
sessionId: 'session-1',
turnId: 'turn-1',
roundId: 'round-1',
toolEvent: {
event_type: 'ParamsPartial',
tool_id: 'tool-1',
tool_name: 'Write',
params: '{"file_path":"src/app.ts","content":"',
},
},
{
sessionId: 'session-1',
turnId: 'turn-1',
roundId: 'round-1',
toolEvent: {
event_type: 'ParamsPartial',
tool_id: 'tool-1',
tool_name: 'Write',
params: 'hello',
},
},
);

expect((merged.toolEvent as any).params).toBe('{"file_path":"src/app.ts","content":"hello');
});

});

describe('shouldProcessEvent', () => {
const mockSessionId = 'test-session';
const mockTurnId = 'test-turn';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,29 @@ function resolveDialogTurnDisplayContent(
return resolveThreadGoalUserMessageDisplay(base, metadata);
}

function mergeParamsPartialEventData(
existing: ToolEventData,
incoming: ToolEventData,
): ToolEventData {
const existingToolEvent = existing.toolEvent as ParamsPartialToolEvent;
const incomingToolEvent = incoming.toolEvent as ParamsPartialToolEvent;
const existingParams = normalizeParamsPartialFragment(existingToolEvent.params);
const incomingParams = normalizeParamsPartialFragment(incomingToolEvent.params);

return {
...existing,
...incoming,
toolEvent: {
...existingToolEvent,
...incomingToolEvent,
params: existingParams + incomingParams,
},
};
}

export const __test_only__ = {
resolveDialogTurnDisplayContent,
mergeParamsPartialEventData,
};

function shouldMarkUnreadCompletion(sessionId: string): boolean {
Expand Down Expand Up @@ -1619,15 +1640,7 @@ function handleToolEvent(
key,
eventData,
'accumulate',
(existing, incoming) => ({
...existing,
toolEvent: {
...(existing.toolEvent as ParamsPartialToolEvent),
params:
normalizeParamsPartialFragment((existing.toolEvent as ParamsPartialToolEvent).params) +
normalizeParamsPartialFragment((incoming.toolEvent as ParamsPartialToolEvent).params)
}
})
mergeParamsPartialEventData,
);
} else {
context.eventBatcher.add(key, eventData, 'replace');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ function applyParamsPartial(
if (!incomingParams) {
return;
}
const isWriteFullParamsSnapshot = isWriteTool && incomingParams.trimStart().startsWith('{');
const newBuffer = isWriteFullParamsSnapshot ? incomingParams : prevBuffer + incomingParams;
const newBuffer = prevBuffer + incomingParams;

let parsedParams: Record<string, any> = {};
try {
Expand Down
Loading
Loading