From baa21a03be36b0a0f7d61449bf7accddcff23787 Mon Sep 17 00:00:00 2001 From: Damian Momot Date: Wed, 27 May 2026 02:16:31 -0700 Subject: [PATCH] fix: Fix ADK Runner race condition for sequential tool execution Persist agent events to the session synchronously within each LLM step in BaseLlmFlow.run(), so the next step does not start before the previous step's events have been appended. Both BaseLlmFlow.run() and Runner skip the duplicate appendEvent for events already present in the session (by id), so events emitted by a transferred sub-agent are appended exactly once. PiperOrigin-RevId: 921989444 --- .../adk/flows/llmflows/BaseLlmFlow.java | 26 +++- .../java/com/google/adk/runner/Runner.java | 29 ++-- .../com/google/adk/sessions/SessionUtils.java | 27 ++++ .../com/google/adk/runner/RunnerTest.java | 147 ++++++++++++++++++ 4 files changed, 217 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java b/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java index dffba0e80..d56a6d666 100644 --- a/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java +++ b/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java @@ -37,6 +37,7 @@ import com.google.adk.models.LlmRegistry; import com.google.adk.models.LlmRequest; import com.google.adk.models.LlmResponse; +import com.google.adk.sessions.SessionUtils; import com.google.adk.telemetry.Tracing; import com.google.adk.tools.BaseTool; import com.google.adk.tools.BaseToolset; @@ -503,7 +504,30 @@ public Flowable run(InvocationContext invocationContext) { private Flowable run( Context spanContext, InvocationContext invocationContext, int stepsCompleted) { - Flowable currentStepEvents = runOneStep(spanContext, invocationContext).cache(); + // Persist each event to the session synchronously within the step so that the next step does + // not start before the previous step's events have been appended. Without this, the deferred + // continuation (concatWith below) subscribes synchronously on runOneStep's upstream onComplete + // signal, which can race with the downstream consumer's appendEvent calls in Runner. + // + // The Runner-side appendEvent still runs and deduplicates this event by id, so plugin + // onEventCallback and non-LlmAgent paths are unaffected. + // + // Events emitted by a transferred sub-agent's nested BaseLlmFlow.run() have already been + // appended by that nested flow, so skip them here to avoid duplicates. Deduplication is done + // by event id against the session's existing events. + Flowable currentStepEvents = + runOneStep(spanContext, invocationContext) + .concatMap( + event -> { + if (SessionUtils.isEventAlreadyAppended(invocationContext.session(), event)) { + return Flowable.just(event); + } + return invocationContext + .sessionService() + .appendEvent(invocationContext.session(), event) + .toFlowable(); + }) + .cache(); if (stepsCompleted + 1 >= maxSteps) { logger.debug("Ending flow execution because max steps reached."); return currentStepEvents; diff --git a/core/src/main/java/com/google/adk/runner/Runner.java b/core/src/main/java/com/google/adk/runner/Runner.java index 1ab101398..45f00ae2f 100644 --- a/core/src/main/java/com/google/adk/runner/Runner.java +++ b/core/src/main/java/com/google/adk/runner/Runner.java @@ -38,6 +38,7 @@ import com.google.adk.sessions.InMemorySessionService; import com.google.adk.sessions.Session; import com.google.adk.sessions.SessionKey; +import com.google.adk.sessions.SessionUtils; import com.google.adk.summarizer.EventsCompactionConfig; import com.google.adk.summarizer.LlmEventSummarizer; import com.google.adk.summarizer.SlidingWindowEventCompactor; @@ -581,19 +582,25 @@ private Flowable runAgentWithUpdatedSession( .agent() .runAsync(contextWithUpdatedSession) .concatMap( - agentEvent -> - this.sessionService - .appendEvent(updatedSession, agentEvent) - .flatMap( - registeredEvent -> { - // TODO: remove this hack after deprecating runAsync with Session. - copySessionStates(updatedSession, initialContext.session()); - return contextWithUpdatedSession + agentEvent -> { + // TODO: remove this hack after deprecating runAsync with Session. + copySessionStates(updatedSession, initialContext.session()); + // BaseLlmFlow appends events synchronously to fix a race where the next LLM + // step would otherwise start before the previous step's events were persisted. + // Skip the duplicate append here so the event is not added twice. + Single appendOrSkip = + SessionUtils.isEventAlreadyAppended(updatedSession, agentEvent) + ? Single.just(agentEvent) + : this.sessionService.appendEvent(updatedSession, agentEvent); + return appendOrSkip + .flatMap( + registeredEvent -> + contextWithUpdatedSession .pluginManager() .onEventCallback(contextWithUpdatedSession, registeredEvent) - .defaultIfEmpty(registeredEvent); - }) - .toFlowable()); + .defaultIfEmpty(registeredEvent)) + .toFlowable(); + }); // If beforeRunCallback returns content, emit it and skip agent Context capturedContext = Context.current(); diff --git a/core/src/main/java/com/google/adk/sessions/SessionUtils.java b/core/src/main/java/com/google/adk/sessions/SessionUtils.java index 1aeca98c9..ef35de21a 100644 --- a/core/src/main/java/com/google/adk/sessions/SessionUtils.java +++ b/core/src/main/java/com/google/adk/sessions/SessionUtils.java @@ -16,6 +16,7 @@ package com.google.adk.sessions; +import com.google.adk.events.Event; import com.google.common.collect.ImmutableList; import com.google.genai.types.Blob; import com.google.genai.types.Content; @@ -31,6 +32,32 @@ public final class SessionUtils { public SessionUtils() {} + /** + * Returns true if an event with the same id is already present in {@code session.events()}. + * + *

Used to deduplicate {@code appendEvent} calls when the same event flows through multiple + * append points (e.g. {@code BaseLlmFlow.run} for a transferred sub-agent and the parent flow, or + * {@code BaseLlmFlow.run} and {@code Runner}). + */ + public static boolean isEventAlreadyAppended(Session session, Event event) { + String eventId = event.id(); + if (eventId == null) { + return false; + } + List events = session.events(); + if (events == null || events.isEmpty()) { + return false; + } + synchronized (events) { + for (Event existing : events) { + if (eventId.equals(existing.id())) { + return true; + } + } + } + return false; + } + /** Base64-encodes inline blobs in content. */ public static Content encodeContent(Content content) { List encodedParts = new ArrayList<>(); diff --git a/core/src/test/java/com/google/adk/runner/RunnerTest.java b/core/src/test/java/com/google/adk/runner/RunnerTest.java index 95718e3e0..f03082075 100644 --- a/core/src/test/java/com/google/adk/runner/RunnerTest.java +++ b/core/src/test/java/com/google/adk/runner/RunnerTest.java @@ -38,6 +38,7 @@ import static org.mockito.Mockito.when; import com.google.adk.agents.BaseAgent; +import com.google.adk.agents.Callbacks.AfterModelCallback; import com.google.adk.agents.InvocationContext; import com.google.adk.agents.LiveRequestQueue; import com.google.adk.agents.LlmAgent; @@ -47,9 +48,14 @@ import com.google.adk.artifacts.BaseArtifactService; import com.google.adk.events.Event; import com.google.adk.flows.llmflows.Functions; +import com.google.adk.models.LlmRequest; import com.google.adk.models.LlmResponse; import com.google.adk.plugins.BasePlugin; import com.google.adk.sessions.BaseSessionService; +import com.google.adk.sessions.GetSessionConfig; +import com.google.adk.sessions.InMemorySessionService; +import com.google.adk.sessions.ListEventsResponse; +import com.google.adk.sessions.ListSessionsResponse; import com.google.adk.sessions.Session; import com.google.adk.sessions.SessionKey; import com.google.adk.summarizer.EventsCompactionConfig; @@ -85,6 +91,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; @@ -860,6 +867,146 @@ public void runAsync_concurrentCalls_firstFails_secondSucceeds() throws Exceptio subscriber2.assertValue(agentEvent); } + /** + * A slow appendEvent must not let the next LLM step start with a stale session missing the + * previous step's function-response event. + */ + @Test + public void runAsync_slowAppendEvent_doesNotCauseStaleSessionInNextStep() throws Exception { + TestLlm raceTestLlm = + createTestLlm( + createFunctionCallLlmResponse("call_1", echoTool.name(), ImmutableMap.of("arg", "v1")), + createTextLlmResponse("done")); + + LlmAgent agentForRace = + createTestAgentBuilder(raceTestLlm).tools(ImmutableList.of(echoTool)).build(); + + BaseSessionService delayedSessionService = + new AppendDelayingSessionService(new InMemorySessionService(), 50); + + Runner runnerForRace = + Runner.builder() + .app(App.builder().name("test").rootAgent(agentForRace).build()) + .sessionService(delayedSessionService) + .build(); + Session raceSession = + runnerForRace.sessionService().createSession("test", "user").blockingGet(); + + var unused = + runnerForRace + .runAsync("user", raceSession.id(), createContent("start")) + .toList() + .blockingGet(); + + ImmutableList requests = ImmutableList.copyOf(raceTestLlm.getRequests()); + assertThat(requests).hasSize(2); + + // Second LLM request must see the function response from step 1. + boolean foundToolResponse = + requests.get(1).contents().stream() + .flatMap(c -> c.parts().stream().flatMap(List::stream)) + .anyMatch(part -> part.functionResponse().isPresent()); + assertThat(foundToolResponse).isTrue(); + } + + /** + * When an LlmAgent transfers control to a sub-LlmAgent, the sub-agent's events flow back up + * through the parent's {@code BaseLlmFlow.run()} pipeline. Each event must be appended to the + * session exactly once. + */ + @Test + public void runAsync_transferToSubAgent_eventsAppendedOnce() throws Exception { + LlmAgent subAgent = + createTestAgentBuilder(createTestLlm(createTextLlmResponse("sub response"))) + .name("sub-agent") + .build(); + + // Force a transfer to sub-agent using an afterModelCallback. + AfterModelCallback transferCallback = + (ctx, response) -> { + ctx.eventActions().setTransferToAgent(subAgent.name()); + return Maybe.empty(); + }; + + TestLlm rootTestLlm = createTestLlm(createTextLlmResponse("initial")); + LlmAgent rootAgent = + createTestAgentBuilder(rootTestLlm) + .subAgents(subAgent) + .afterModelCallback(ImmutableList.of(transferCallback)) + .build(); + + Runner transferRunner = + Runner.builder().app(App.builder().name("test").rootAgent(rootAgent).build()).build(); + Session transferSession = + transferRunner.sessionService().createSession("test", "user").blockingGet(); + + var unused = + transferRunner + .runAsync("user", transferSession.id(), createContent("start")) + .toList() + .blockingGet(); + + Session finalSession = + transferRunner + .sessionService() + .getSession( + transferSession.appName(), + transferSession.userId(), + transferSession.id(), + Optional.empty()) + .blockingGet(); + + // Each event id should appear at most once in the session. + List eventIds = finalSession.events().stream().map(Event::id).toList(); + assertThat(eventIds).containsNoDuplicates(); + } + + /** {@link BaseSessionService} that delays {@link #appendEvent} to surface ordering bugs. */ + private static final class AppendDelayingSessionService implements BaseSessionService { + private final BaseSessionService delegate; + private final long appendDelayMs; + + AppendDelayingSessionService(BaseSessionService delegate, long appendDelayMs) { + this.delegate = delegate; + this.appendDelayMs = appendDelayMs; + } + + // Delegates to the underlying BaseSessionService createSession overload, which is itself + // deprecated; suppressed because the wrapper must preserve the same signature. + @SuppressWarnings("deprecation") + @Override + public Single createSession( + String appName, String userId, ConcurrentMap state, String sessionId) { + return delegate.createSession(appName, userId, state, sessionId); + } + + @Override + public Maybe getSession( + String appName, String userId, String sessionId, Optional config) { + return delegate.getSession(appName, userId, sessionId, config); + } + + @Override + public Single listSessions(String appName, String userId) { + return delegate.listSessions(appName, userId); + } + + @Override + public Completable deleteSession(String appName, String userId, String sessionId) { + return delegate.deleteSession(appName, userId, sessionId); + } + + @Override + public Single listEvents(String appName, String userId, String sessionId) { + return delegate.listEvents(appName, userId, sessionId); + } + + @Override + public Single appendEvent(Session session, Event event) { + return delegate.appendEvent(session, event).delay(appendDelayMs, MILLISECONDS); + } + } + @Test public void runAsync_withSessionKey_success() { var events =