diff --git a/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java b/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java index dffba0e80..d56a6d666 100644 --- a/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java +++ b/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java @@ -37,6 +37,7 @@ import com.google.adk.models.LlmRegistry; import com.google.adk.models.LlmRequest; import com.google.adk.models.LlmResponse; +import com.google.adk.sessions.SessionUtils; import com.google.adk.telemetry.Tracing; import com.google.adk.tools.BaseTool; import com.google.adk.tools.BaseToolset; @@ -503,7 +504,30 @@ public Flowable run(InvocationContext invocationContext) { private Flowable run( Context spanContext, InvocationContext invocationContext, int stepsCompleted) { - Flowable currentStepEvents = runOneStep(spanContext, invocationContext).cache(); + // Persist each event to the session synchronously within the step so that the next step does + // not start before the previous step's events have been appended. Without this, the deferred + // continuation (concatWith below) subscribes synchronously on runOneStep's upstream onComplete + // signal, which can race with the downstream consumer's appendEvent calls in Runner. + // + // The Runner-side appendEvent still runs and deduplicates this event by id, so plugin + // onEventCallback and non-LlmAgent paths are unaffected. + // + // Events emitted by a transferred sub-agent's nested BaseLlmFlow.run() have already been + // appended by that nested flow, so skip them here to avoid duplicates. Deduplication is done + // by event id against the session's existing events. + Flowable currentStepEvents = + runOneStep(spanContext, invocationContext) + .concatMap( + event -> { + if (SessionUtils.isEventAlreadyAppended(invocationContext.session(), event)) { + return Flowable.just(event); + } + return invocationContext + .sessionService() + .appendEvent(invocationContext.session(), event) + .toFlowable(); + }) + .cache(); if (stepsCompleted + 1 >= maxSteps) { logger.debug("Ending flow execution because max steps reached."); return currentStepEvents; diff --git a/core/src/main/java/com/google/adk/runner/Runner.java b/core/src/main/java/com/google/adk/runner/Runner.java index 1ab101398..45f00ae2f 100644 --- a/core/src/main/java/com/google/adk/runner/Runner.java +++ b/core/src/main/java/com/google/adk/runner/Runner.java @@ -38,6 +38,7 @@ import com.google.adk.sessions.InMemorySessionService; import com.google.adk.sessions.Session; import com.google.adk.sessions.SessionKey; +import com.google.adk.sessions.SessionUtils; import com.google.adk.summarizer.EventsCompactionConfig; import com.google.adk.summarizer.LlmEventSummarizer; import com.google.adk.summarizer.SlidingWindowEventCompactor; @@ -581,19 +582,25 @@ private Flowable runAgentWithUpdatedSession( .agent() .runAsync(contextWithUpdatedSession) .concatMap( - agentEvent -> - this.sessionService - .appendEvent(updatedSession, agentEvent) - .flatMap( - registeredEvent -> { - // TODO: remove this hack after deprecating runAsync with Session. - copySessionStates(updatedSession, initialContext.session()); - return contextWithUpdatedSession + agentEvent -> { + // TODO: remove this hack after deprecating runAsync with Session. + copySessionStates(updatedSession, initialContext.session()); + // BaseLlmFlow appends events synchronously to fix a race where the next LLM + // step would otherwise start before the previous step's events were persisted. + // Skip the duplicate append here so the event is not added twice. + Single appendOrSkip = + SessionUtils.isEventAlreadyAppended(updatedSession, agentEvent) + ? Single.just(agentEvent) + : this.sessionService.appendEvent(updatedSession, agentEvent); + return appendOrSkip + .flatMap( + registeredEvent -> + contextWithUpdatedSession .pluginManager() .onEventCallback(contextWithUpdatedSession, registeredEvent) - .defaultIfEmpty(registeredEvent); - }) - .toFlowable()); + .defaultIfEmpty(registeredEvent)) + .toFlowable(); + }); // If beforeRunCallback returns content, emit it and skip agent Context capturedContext = Context.current(); diff --git a/core/src/main/java/com/google/adk/sessions/SessionUtils.java b/core/src/main/java/com/google/adk/sessions/SessionUtils.java index 1aeca98c9..ef35de21a 100644 --- a/core/src/main/java/com/google/adk/sessions/SessionUtils.java +++ b/core/src/main/java/com/google/adk/sessions/SessionUtils.java @@ -16,6 +16,7 @@ package com.google.adk.sessions; +import com.google.adk.events.Event; import com.google.common.collect.ImmutableList; import com.google.genai.types.Blob; import com.google.genai.types.Content; @@ -31,6 +32,32 @@ public final class SessionUtils { public SessionUtils() {} + /** + * Returns true if an event with the same id is already present in {@code session.events()}. + * + *

Used to deduplicate {@code appendEvent} calls when the same event flows through multiple + * append points (e.g. {@code BaseLlmFlow.run} for a transferred sub-agent and the parent flow, or + * {@code BaseLlmFlow.run} and {@code Runner}). + */ + public static boolean isEventAlreadyAppended(Session session, Event event) { + String eventId = event.id(); + if (eventId == null) { + return false; + } + List events = session.events(); + if (events == null || events.isEmpty()) { + return false; + } + synchronized (events) { + for (Event existing : events) { + if (eventId.equals(existing.id())) { + return true; + } + } + } + return false; + } + /** Base64-encodes inline blobs in content. */ public static Content encodeContent(Content content) { List encodedParts = new ArrayList<>(); diff --git a/core/src/test/java/com/google/adk/runner/RunnerTest.java b/core/src/test/java/com/google/adk/runner/RunnerTest.java index 95718e3e0..f03082075 100644 --- a/core/src/test/java/com/google/adk/runner/RunnerTest.java +++ b/core/src/test/java/com/google/adk/runner/RunnerTest.java @@ -38,6 +38,7 @@ import static org.mockito.Mockito.when; import com.google.adk.agents.BaseAgent; +import com.google.adk.agents.Callbacks.AfterModelCallback; import com.google.adk.agents.InvocationContext; import com.google.adk.agents.LiveRequestQueue; import com.google.adk.agents.LlmAgent; @@ -47,9 +48,14 @@ import com.google.adk.artifacts.BaseArtifactService; import com.google.adk.events.Event; import com.google.adk.flows.llmflows.Functions; +import com.google.adk.models.LlmRequest; import com.google.adk.models.LlmResponse; import com.google.adk.plugins.BasePlugin; import com.google.adk.sessions.BaseSessionService; +import com.google.adk.sessions.GetSessionConfig; +import com.google.adk.sessions.InMemorySessionService; +import com.google.adk.sessions.ListEventsResponse; +import com.google.adk.sessions.ListSessionsResponse; import com.google.adk.sessions.Session; import com.google.adk.sessions.SessionKey; import com.google.adk.summarizer.EventsCompactionConfig; @@ -85,6 +91,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; @@ -860,6 +867,146 @@ public void runAsync_concurrentCalls_firstFails_secondSucceeds() throws Exceptio subscriber2.assertValue(agentEvent); } + /** + * A slow appendEvent must not let the next LLM step start with a stale session missing the + * previous step's function-response event. + */ + @Test + public void runAsync_slowAppendEvent_doesNotCauseStaleSessionInNextStep() throws Exception { + TestLlm raceTestLlm = + createTestLlm( + createFunctionCallLlmResponse("call_1", echoTool.name(), ImmutableMap.of("arg", "v1")), + createTextLlmResponse("done")); + + LlmAgent agentForRace = + createTestAgentBuilder(raceTestLlm).tools(ImmutableList.of(echoTool)).build(); + + BaseSessionService delayedSessionService = + new AppendDelayingSessionService(new InMemorySessionService(), 50); + + Runner runnerForRace = + Runner.builder() + .app(App.builder().name("test").rootAgent(agentForRace).build()) + .sessionService(delayedSessionService) + .build(); + Session raceSession = + runnerForRace.sessionService().createSession("test", "user").blockingGet(); + + var unused = + runnerForRace + .runAsync("user", raceSession.id(), createContent("start")) + .toList() + .blockingGet(); + + ImmutableList requests = ImmutableList.copyOf(raceTestLlm.getRequests()); + assertThat(requests).hasSize(2); + + // Second LLM request must see the function response from step 1. + boolean foundToolResponse = + requests.get(1).contents().stream() + .flatMap(c -> c.parts().stream().flatMap(List::stream)) + .anyMatch(part -> part.functionResponse().isPresent()); + assertThat(foundToolResponse).isTrue(); + } + + /** + * When an LlmAgent transfers control to a sub-LlmAgent, the sub-agent's events flow back up + * through the parent's {@code BaseLlmFlow.run()} pipeline. Each event must be appended to the + * session exactly once. + */ + @Test + public void runAsync_transferToSubAgent_eventsAppendedOnce() throws Exception { + LlmAgent subAgent = + createTestAgentBuilder(createTestLlm(createTextLlmResponse("sub response"))) + .name("sub-agent") + .build(); + + // Force a transfer to sub-agent using an afterModelCallback. + AfterModelCallback transferCallback = + (ctx, response) -> { + ctx.eventActions().setTransferToAgent(subAgent.name()); + return Maybe.empty(); + }; + + TestLlm rootTestLlm = createTestLlm(createTextLlmResponse("initial")); + LlmAgent rootAgent = + createTestAgentBuilder(rootTestLlm) + .subAgents(subAgent) + .afterModelCallback(ImmutableList.of(transferCallback)) + .build(); + + Runner transferRunner = + Runner.builder().app(App.builder().name("test").rootAgent(rootAgent).build()).build(); + Session transferSession = + transferRunner.sessionService().createSession("test", "user").blockingGet(); + + var unused = + transferRunner + .runAsync("user", transferSession.id(), createContent("start")) + .toList() + .blockingGet(); + + Session finalSession = + transferRunner + .sessionService() + .getSession( + transferSession.appName(), + transferSession.userId(), + transferSession.id(), + Optional.empty()) + .blockingGet(); + + // Each event id should appear at most once in the session. + List eventIds = finalSession.events().stream().map(Event::id).toList(); + assertThat(eventIds).containsNoDuplicates(); + } + + /** {@link BaseSessionService} that delays {@link #appendEvent} to surface ordering bugs. */ + private static final class AppendDelayingSessionService implements BaseSessionService { + private final BaseSessionService delegate; + private final long appendDelayMs; + + AppendDelayingSessionService(BaseSessionService delegate, long appendDelayMs) { + this.delegate = delegate; + this.appendDelayMs = appendDelayMs; + } + + // Delegates to the underlying BaseSessionService createSession overload, which is itself + // deprecated; suppressed because the wrapper must preserve the same signature. + @SuppressWarnings("deprecation") + @Override + public Single createSession( + String appName, String userId, ConcurrentMap state, String sessionId) { + return delegate.createSession(appName, userId, state, sessionId); + } + + @Override + public Maybe getSession( + String appName, String userId, String sessionId, Optional config) { + return delegate.getSession(appName, userId, sessionId, config); + } + + @Override + public Single listSessions(String appName, String userId) { + return delegate.listSessions(appName, userId); + } + + @Override + public Completable deleteSession(String appName, String userId, String sessionId) { + return delegate.deleteSession(appName, userId, sessionId); + } + + @Override + public Single listEvents(String appName, String userId, String sessionId) { + return delegate.listEvents(appName, userId, sessionId); + } + + @Override + public Single appendEvent(Session session, Event event) { + return delegate.appendEvent(session, event).delay(appendDelayMs, MILLISECONDS); + } + } + @Test public void runAsync_withSessionKey_success() { var events =