Skip to content

Commit 06dd80c

Browse files
committed
feat(cloud-agents): add no-repo mode, fix session init race, enable follow-ups after cloud run
1 parent c316a8b commit 06dd80c

File tree

7 files changed

+154
-36
lines changed

7 files changed

+154
-36
lines changed

apps/code/src/renderer/features/command-center/components/CommandCenterSessionView.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ export function CommandCenterSessionView({
2929
promptStartedAt,
3030
isInitializing,
3131
cloudBranch,
32-
readOnlyMessage,
3332
errorTitle,
3433
errorMessage,
3534
} = useSessionViewState(taskId, task);
@@ -67,7 +66,6 @@ export function CommandCenterSessionView({
6766
onRetry={isCloud ? undefined : handleRetry}
6867
onNewSession={isCloud ? undefined : handleNewSession}
6968
isInitializing={isInitializing}
70-
readOnlyMessage={readOnlyMessage}
7169
compact
7270
/>
7371
</Flex>

apps/code/src/renderer/features/sessions/components/SessionView.tsx

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ interface SessionViewProps {
5858
onRetry?: () => void;
5959
onNewSession?: () => void;
6060
isInitializing?: boolean;
61-
readOnlyMessage?: string;
6261
slackThreadUrl?: string;
6362
compact?: boolean;
6463
}
@@ -87,7 +86,6 @@ export function SessionView({
8786
onRetry,
8887
onNewSession,
8988
isInitializing = false,
90-
readOnlyMessage,
9189
slackThreadUrl,
9290
compact = false,
9391
}: SessionViewProps) {
@@ -506,17 +504,6 @@ export function SessionView({
506504
/>
507505
</Box>
508506
</Box>
509-
) : readOnlyMessage ? (
510-
<Flex
511-
align="center"
512-
justify="center"
513-
py="2"
514-
className="border-gray-4 border-t"
515-
>
516-
<Text size="2" color="gray">
517-
{readOnlyMessage}
518-
</Text>
519-
</Flex>
520507
) : (
521508
<Box className="relative border-gray-4 border-t">
522509
<Box

apps/code/src/renderer/features/sessions/hooks/useSessionViewState.ts

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { useCwd } from "@features/sidebar/hooks/useCwd";
22
import { useWorkspace } from "@features/workspace/hooks/useWorkspace";
33
import type { Task } from "@shared/types";
4-
import { useMemo } from "react";
54
import { useSessionForTask } from "../stores/sessionStore";
65

76
export function useSessionViewState(taskId: string, task: Task) {
@@ -20,9 +19,7 @@ export function useSessionViewState(taskId: string, task: Task) {
2019
cloudStatus === "in_progress");
2120
const isCloudRunTerminal = isCloud && !isCloudRunNotTerminal;
2221

23-
const isRunning = isCloud
24-
? isCloudRunNotTerminal
25-
: session?.status === "connected";
22+
const isRunning = isCloud ? true : session?.status === "connected";
2623
const hasError = isCloud ? false : session?.status === "error";
2724

2825
const events = session?.events ?? [];
@@ -46,12 +43,6 @@ export function useSessionViewState(taskId: string, task: Task) {
4643
? (workspace?.baseBranch ?? task.latest_run?.branch ?? null)
4744
: null;
4845

49-
const readOnlyMessage = useMemo(() => {
50-
if (!isCloud) return undefined;
51-
if (isCloudRunTerminal) return "This cloud run has finished";
52-
return undefined;
53-
}, [isCloud, isCloudRunTerminal]);
54-
5546
return {
5647
session,
5748
repoPath,
@@ -66,7 +57,6 @@ export function useSessionViewState(taskId: string, task: Task) {
6657
promptStartedAt,
6758
isInitializing,
6859
cloudBranch,
69-
readOnlyMessage,
7060
errorTitle: isCloud ? undefined : session?.errorTitle,
7161
errorMessage: isCloud ? undefined : session?.errorMessage,
7262
};

apps/code/src/renderer/features/task-detail/components/TaskLogsPanel.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ export function TaskLogsPanel({ taskId, task }: TaskLogsPanelProps) {
5555
promptStartedAt,
5656
isInitializing,
5757
cloudBranch,
58-
readOnlyMessage,
5958
errorTitle,
6059
errorMessage,
6160
} = useSessionViewState(taskId, task);
@@ -157,7 +156,6 @@ export function TaskLogsPanel({ taskId, task }: TaskLogsPanelProps) {
157156
onRetry={isCloud ? undefined : handleRetry}
158157
onNewSession={isCloud ? undefined : handleNewSession}
159158
isInitializing={isInitializing}
160-
readOnlyMessage={readOnlyMessage}
161159
slackThreadUrl={slackThreadUrl}
162160
/>
163161
</ErrorBoundary>

packages/agent/src/server/agent-server.ts

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ export class AgentServer {
162162
private questionRelayedToSlack = false;
163163
private detectedPrUrl: string | null = null;
164164
private resumeState: ResumeState | null = null;
165+
// Guards against concurrent session initialization. autoInitializeSession() and
166+
// the GET /events SSE handler can both call initializeSession() — the SSE connection
167+
// often arrives while newSession() is still awaited (this.session is still null),
168+
// causing a second session to be created and duplicate Slack messages to be sent.
169+
private initializationPromise: Promise<void> | null = null;
170+
private pendingEvents: Record<string, unknown>[] = [];
165171

166172
private emitConsoleLog = (
167173
level: LogLevel,
@@ -264,6 +270,7 @@ export class AgentServer {
264270
await this.initializeSession(payload, sseController);
265271
} else {
266272
this.session.sseController = sseController;
273+
this.replayPendingEvents();
267274
}
268275

269276
this.sendSseEvent(sseController, {
@@ -483,6 +490,8 @@ export class AgentServer {
483490
`Processing user message (detectedPrUrl=${this.detectedPrUrl ?? "none"}): ${content.substring(0, 100)}...`,
484491
);
485492

493+
this.session.logWriter.resetTurnMessages(this.session.payload.run_id);
494+
486495
const result = await this.session.clientConnection.prompt({
487496
sessionId: this.session.acpSessionId,
488497
prompt: [{ type: "text", text: content }],
@@ -501,7 +510,31 @@ export class AgentServer {
501510

502511
this.broadcastTurnComplete(result.stopReason);
503512

504-
return { stopReason: result.stopReason };
513+
if (result.stopReason === "end_turn") {
514+
// Relay the response to Slack. For follow-ups this is the primary
515+
// delivery path — the HTTP caller only handles reactions.
516+
this.relayAgentResponse(this.session.payload).catch((err) =>
517+
this.logger.warn("Failed to relay follow-up response", err),
518+
);
519+
}
520+
521+
// Flush logs and include the assistant's response text so callers
522+
// (e.g. Slack follow-up forwarding) can extract it without racing
523+
// against async log persistence to object storage.
524+
let assistantMessage: string | undefined;
525+
try {
526+
await this.session.logWriter.flush(this.session.payload.run_id);
527+
assistantMessage = this.session.logWriter.getFullAgentResponse(
528+
this.session.payload.run_id,
529+
);
530+
} catch {
531+
this.logger.warn("Failed to extract assistant message from logs");
532+
}
533+
534+
return {
535+
stopReason: result.stopReason,
536+
...(assistantMessage && { assistant_message: assistantMessage }),
537+
};
505538
}
506539

507540
case POSTHOG_NOTIFICATIONS.CANCEL:
@@ -530,6 +563,40 @@ export class AgentServer {
530563
private async initializeSession(
531564
payload: JwtPayload,
532565
sseController: SseController | null,
566+
): Promise<void> {
567+
// Race condition guard: autoInitializeSession() starts first, but while it awaits
568+
// newSession() (which takes ~1-2s for MCP metadata fetch), the Temporal relay connects
569+
// to GET /events. That handler sees this.session === null and calls initializeSession()
570+
// again, creating a duplicate session that sends the same prompt twice — resulting in
571+
// duplicate Slack messages. This lock ensures the second caller waits for the first
572+
// initialization to finish and reuses the session.
573+
if (this.initializationPromise) {
574+
this.logger.info("Waiting for in-progress initialization", {
575+
runId: payload.run_id,
576+
});
577+
await this.initializationPromise;
578+
// After waiting, just attach the SSE controller if needed
579+
if (this.session && sseController) {
580+
this.session.sseController = sseController;
581+
this.replayPendingEvents();
582+
}
583+
return;
584+
}
585+
586+
this.initializationPromise = this._doInitializeSession(
587+
payload,
588+
sseController,
589+
);
590+
try {
591+
await this.initializationPromise;
592+
} finally {
593+
this.initializationPromise = null;
594+
}
595+
}
596+
597+
private async _doInitializeSession(
598+
payload: JwtPayload,
599+
sseController: SseController | null,
533600
): Promise<void> {
534601
if (this.session) {
535602
await this.cleanupSession();
@@ -770,6 +837,8 @@ export class AgentServer {
770837
usedInitialPromptOverride: !!initialPromptOverride,
771838
});
772839

840+
this.session.logWriter.resetTurnMessages(payload.run_id);
841+
773842
const result = await this.session.clientConnection.prompt({
774843
sessionId: this.session.acpSessionId,
775844
prompt: [{ type: "text", text: initialPrompt }],
@@ -809,8 +878,8 @@ export class AgentServer {
809878
const pendingUserMessage = this.getPendingUserMessage(taskRun);
810879

811880
const sandboxContext = this.resumeState.snapshotApplied
812-
? `The sandbox environment (all files, packages, and code changes) has been fully restored from a snapshot.`
813-
: `The sandbox could not be restored from a snapshot (it may have expired). You are starting with a fresh environment but have the full conversation history below.`;
881+
? `The workspace environment (all files, packages, and code changes) has been fully restored from where you left off.`
882+
: `The workspace files from the previous session were not restored (the file snapshot may have expired), so you are starting with a fresh environment. Your conversation history is fully preserved below.`;
814883

815884
let resumePrompt: string;
816885
if (pendingUserMessage) {
@@ -842,6 +911,8 @@ export class AgentServer {
842911
// Clear resume state so it's not reused
843912
this.resumeState = null;
844913

914+
this.session.logWriter.resetTurnMessages(payload.run_id);
915+
845916
const result = await this.session.clientConnection.prompt({
846917
sessionId: this.session.acpSessionId,
847918
prompt: [{ type: "text", text: resumePrompt }],
@@ -852,6 +923,10 @@ export class AgentServer {
852923
});
853924

854925
this.broadcastTurnComplete(result.stopReason);
926+
927+
if (result.stopReason === "end_turn") {
928+
await this.relayAgentResponse(payload);
929+
}
855930
} catch (error) {
856931
this.logger.error("Failed to send resume message", error);
857932
if (this.session) {
@@ -992,6 +1067,27 @@ Important:
9921067
`;
9931068
}
9941069

1070+
if (!this.config.repositoryPath) {
1071+
return `
1072+
# Cloud Task Execution — No Repository Mode
1073+
1074+
You are a helpful assistant with access to PostHog via MCP tools. You can help with both code tasks and data/analytics questions.
1075+
1076+
When the user asks about analytics, data, metrics, events, funnels, dashboards, feature flags, experiments, or anything PostHog-related:
1077+
- Use your PostHog MCP tools to query data, search insights, and provide real answers
1078+
- Do NOT tell the user to check an external analytics platform — you ARE the analytics platform
1079+
- Use tools like insight-query, query-run, event-definitions-list, and others to answer questions directly
1080+
1081+
When the user asks for code changes or software engineering tasks:
1082+
- Let them know you can help but don't have a repository connected for this session
1083+
- Offer to write code snippets, scripts, or provide guidance
1084+
1085+
Important:
1086+
- Do NOT create branches, commits, or pull requests in this mode.
1087+
- Prefer using MCP tools to answer questions with real data over giving generic advice.
1088+
`;
1089+
}
1090+
9951091
return `
9961092
# Cloud Task Execution
9971093
@@ -1124,6 +1220,12 @@ Important:
11241220
},
11251221
};
11261222
},
1223+
extNotification: async (
1224+
method: string,
1225+
params: Record<string, unknown>,
1226+
) => {
1227+
this.logger.debug("Extension notification", { method, params });
1228+
},
11271229
sessionUpdate: async (params: {
11281230
sessionId: string;
11291231
update?: Record<string, unknown>;
@@ -1176,7 +1278,7 @@ Important:
11761278
});
11771279
}
11781280

1179-
const message = this.session.logWriter.getLastAgentMessage(payload.run_id);
1281+
const message = this.session.logWriter.getFullAgentResponse(payload.run_id);
11801282
if (!message) {
11811283
this.logger.warn("No agent message found for Slack relay", {
11821284
taskId: payload.task_id,
@@ -1385,6 +1487,7 @@ Important:
13851487
this.session.sseController.close();
13861488
}
13871489

1490+
this.pendingEvents = [];
13881491
this.session = null;
13891492
}
13901493

@@ -1444,6 +1547,18 @@ Important:
14441547
private broadcastEvent(event: Record<string, unknown>): void {
14451548
if (this.session?.sseController) {
14461549
this.sendSseEvent(this.session.sseController, event);
1550+
} else if (this.session) {
1551+
// Buffer events during initialization (sseController not yet attached)
1552+
this.pendingEvents.push(event);
1553+
}
1554+
}
1555+
1556+
private replayPendingEvents(): void {
1557+
if (!this.session?.sseController || this.pendingEvents.length === 0) return;
1558+
const events = this.pendingEvents;
1559+
this.pendingEvents = [];
1560+
for (const event of events) {
1561+
this.sendSseEvent(this.session.sseController, event);
14471562
}
14481563
}
14491564

packages/agent/src/server/question-relay.test.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ describe("Question relay", () => {
248248
payload: TEST_PAYLOAD,
249249
logWriter: {
250250
flush: vi.fn().mockResolvedValue(undefined),
251-
getLastAgentMessage: vi.fn().mockReturnValue("agent response"),
251+
getFullAgentResponse: vi.fn().mockReturnValue("agent response"),
252252
isRegistered: vi.fn().mockReturnValue(true),
253253
},
254254
};
@@ -269,7 +269,7 @@ describe("Question relay", () => {
269269
payload: TEST_PAYLOAD,
270270
logWriter: {
271271
flush: vi.fn().mockResolvedValue(undefined),
272-
getLastAgentMessage: vi.fn().mockReturnValue("agent response"),
272+
getFullAgentResponse: vi.fn().mockReturnValue("agent response"),
273273
isRegistered: vi.fn().mockReturnValue(true),
274274
},
275275
};
@@ -293,7 +293,7 @@ describe("Question relay", () => {
293293
payload: TEST_PAYLOAD,
294294
logWriter: {
295295
flush: vi.fn().mockResolvedValue(undefined),
296-
getLastAgentMessage: vi.fn().mockReturnValue(null),
296+
getFullAgentResponse: vi.fn().mockReturnValue(null),
297297
isRegistered: vi.fn().mockReturnValue(true),
298298
},
299299
};
@@ -323,6 +323,13 @@ describe("Question relay", () => {
323323
payload: TEST_PAYLOAD,
324324
acpSessionId: "acp-session",
325325
clientConnection: { prompt: promptSpy },
326+
logWriter: {
327+
flushAll: vi.fn().mockResolvedValue(undefined),
328+
getFullAgentResponse: vi.fn().mockReturnValue(null),
329+
resetTurnMessages: vi.fn(),
330+
flush: vi.fn().mockResolvedValue(undefined),
331+
isRegistered: vi.fn().mockReturnValue(true),
332+
},
326333
};
327334

328335
await server.sendInitialTaskMessage(TEST_PAYLOAD);
@@ -350,6 +357,13 @@ describe("Question relay", () => {
350357
payload: TEST_PAYLOAD,
351358
acpSessionId: "acp-session",
352359
clientConnection: { prompt: promptSpy },
360+
logWriter: {
361+
flushAll: vi.fn().mockResolvedValue(undefined),
362+
getFullAgentResponse: vi.fn().mockReturnValue(null),
363+
resetTurnMessages: vi.fn(),
364+
flush: vi.fn().mockResolvedValue(undefined),
365+
isRegistered: vi.fn().mockReturnValue(true),
366+
},
353367
};
354368

355369
await server.sendInitialTaskMessage(TEST_PAYLOAD);

0 commit comments

Comments
 (0)