From 20b370862f72c37f5cf3de9c518c5c29a8d8d1ba Mon Sep 17 00:00:00 2001 From: Google Team Member Date: Thu, 22 Jan 2026 22:38:26 -0800 Subject: [PATCH] feat: Introduce TailRetentionEventCompactor to compact and retain the tail of the event stream Provide a way to manage the size of an event stream Specifically, it: * Keeps the retentionSize most recent events raw. * Compacts all events that never compacted and older than the retained tail, including the most recent compaction events, into a new summary event. * Appends this new summary event to the end of the event stream. PiperOrigin-RevId: 859936701 --- .../TailRetentionEventCompactor.java | 193 ++++++++++++++++++ .../TailRetentionEventCompactorTest.java | 193 ++++++++++++++++++ 2 files changed, 386 insertions(+) create mode 100644 core/src/main/java/com/google/adk/summarizer/TailRetentionEventCompactor.java create mode 100644 core/src/test/java/com/google/adk/summarizer/TailRetentionEventCompactorTest.java diff --git a/core/src/main/java/com/google/adk/summarizer/TailRetentionEventCompactor.java b/core/src/main/java/com/google/adk/summarizer/TailRetentionEventCompactor.java new file mode 100644 index 00000000..c740351b --- /dev/null +++ b/core/src/main/java/com/google/adk/summarizer/TailRetentionEventCompactor.java @@ -0,0 +1,193 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.summarizer; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.adk.events.Event; +import com.google.adk.events.EventCompaction; +import com.google.adk.sessions.BaseSessionService; +import com.google.adk.sessions.Session; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Maybe; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class performs event compaction by retaining the tail of the event stream. + * + * + * + *

This compactor produces a rolling summary. Each new compaction event includes the content of + * the previous compaction event (if any) along with new events, effectively superseding all prior + * compactions. + */ +public final class TailRetentionEventCompactor implements EventCompactor { + + private static final Logger logger = LoggerFactory.getLogger(TailRetentionEventCompactor.class); + + private final BaseEventSummarizer summarizer; + private final int retentionSize; + + public TailRetentionEventCompactor(BaseEventSummarizer summarizer, int retentionSize) { + this.summarizer = summarizer; + this.retentionSize = retentionSize; + } + + @Override + public Completable compact(Session session, BaseSessionService sessionService) { + checkArgument(summarizer != null, "Missing BaseEventSummarizer for event compaction"); + logger.debug("Running tail retention event compaction for session {}", session.id()); + + return Completable.fromMaybe( + getCompactionEvents(session.events(), retentionSize) + .flatMap(summarizer::summarizeEvents) + .flatMapSingle(e -> sessionService.appendEvent(session, e))); + } + + /** + * Identifies events to be compacted based on the tail retention strategy. + * + *

This method iterates backwards through the event list to find the most recent compaction + * event (if any) and collects all uncompacted events that occurred after the range covered by + * that compaction. It then applies the retention policy, excluding the most recent {@code + * retentionSize} events from being compacted. + * + *

Basic Scenario: + * + *

+ * + *

Advanced Scenario (Handling Gaps): + * + *

Consider an edge case where retention size is 3. Event E4 appears before the last compaction + * event (C2) and even the one prior (C1), but remains uncompacted and must be included in the + * third compaction (C3). + * + *

+ * + *

Execution with Retention = 3: + * + *

    + *
  1. The method scans backward: E9, C2, E7, E6, C1, E4... + *
  2. C2 is identified as the most recent compaction event (end timestamp T=3). + *
  3. E9, E7, E6 are collected as they are newer than T=3. + *
  4. C1 is ignored as we only care about the boundary set by the latest compaction. + *
  5. E4 (T=4) is collected because it is newer than T=3. + *
  6. Scanning stops at E3 as it is covered by C2 (timestamp <= T=3). + *
  7. The initial list of events to summarize: [E9, E7, E6, E4]. + *
  8. After appending the compaction event C2, the list becomes: [E9, E7, E6, E4, C2] + *
  9. Reversing the list: [C2, E4, E6, E7, E9]. + *
  10. Applying retention (keep last 3): E6, E7, E9 are removed from the summary list. + *
  11. Final Output: {@code [C2, E4]}. E4 and the previous summary C2 will be compacted + * together. The new compaction event will cover the range from the start of the included + * compaction event (C2, T=1) to the end of the new events (E4, T=4). + *
+ */ + static Maybe> getCompactionEvents(List events, int retentionSize) { + long compactionEndTimestamp = Long.MIN_VALUE; + Event lastCompactionEvent = null; + List eventsToSummarize = new ArrayList<>(); + + // Iterate backwards from the end of the window to summarize. + // We use a single loop to: + // 1. Collect all raw events that happened after the latest compaction. + // 2. Identify the latest compaction event to establish the stop condition (boundary). + ListIterator iter = events.listIterator(events.size()); + while (iter.hasPrevious()) { + Event event = iter.previous(); + + if (!isCompactEvent(event)) { + // Only include events that are strictly after the last compaction range. + if (event.timestamp() > compactionEndTimestamp) { + eventsToSummarize.add(event); + continue; + } else { + // Exit early if we have reached the last event of last compaction range. + break; + } + } + + EventCompaction compaction = event.actions().compaction().orElse(null); + // We use the most recent compaction event to define the time boundary. Any subsequent (older) + // compaction events are ignored. + if (lastCompactionEvent == null) { + compactionEndTimestamp = compaction.endTimestamp(); + lastCompactionEvent = event; + } + } + + // If there are not enough events to summarize, we can return early. + if (eventsToSummarize.size() <= retentionSize) { + return Maybe.empty(); + } + + // Add the last compaction event to the list of events to summarize. + // This is to ensure that the last compaction event is included in the summary. + if (lastCompactionEvent != null) { + EventCompaction compaction = lastCompactionEvent.actions().compaction().get(); + eventsToSummarize.add( + lastCompactionEvent.toBuilder() + .content(compaction.compactedContent()) + // Use the start timestamp so that the new summary covers the entire range. + .timestamp(compaction.startTimestamp()) + .build()); + } + + Collections.reverse(eventsToSummarize); + + // Apply retention: keep the most recent 'retentionSize' events out of the summary. + // We do this by removing them from the list of events to be summarized. + eventsToSummarize + .subList(eventsToSummarize.size() - retentionSize, eventsToSummarize.size()) + .clear(); + return Maybe.just(eventsToSummarize); + } + + private static boolean isCompactEvent(Event event) { + return event.actions() != null && event.actions().compaction().isPresent(); + } +} diff --git a/core/src/test/java/com/google/adk/summarizer/TailRetentionEventCompactorTest.java b/core/src/test/java/com/google/adk/summarizer/TailRetentionEventCompactorTest.java new file mode 100644 index 00000000..4088f833 --- /dev/null +++ b/core/src/test/java/com/google/adk/summarizer/TailRetentionEventCompactorTest.java @@ -0,0 +1,193 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.summarizer; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.adk.events.Event; +import com.google.adk.events.EventActions; +import com.google.adk.events.EventCompaction; +import com.google.common.collect.ImmutableList; +import com.google.genai.types.Content; +import com.google.genai.types.Part; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TailRetentionEventCompactorTest { + + @Test + public void getCompactionEvents_notEnoughEvents_returnsEmpty() { + ImmutableList events = + ImmutableList.of( + createEvent(1, "Event1"), createEvent(2, "Event2"), createEvent(3, "Event3")); + + // Retention size 5 > 3 events + TailRetentionEventCompactor.getCompactionEvents(events, 5) + .test() + .assertNoValues() + .assertComplete(); + } + + @Test + public void getCompactionEvents_respectRetentionSize() { + // Retention size is 2. + ImmutableList events = + ImmutableList.of( + createEvent(1, "Event1"), createEvent(2, "Retain1"), createEvent(3, "Retain2")); + + List result = TailRetentionEventCompactor.getCompactionEvents(events, 2).blockingGet(); + + assertThat(result).hasSize(1); + assertThat(getPromptText(result.get(0))).isEqualTo("Event1"); + } + + @Test + public void getCompactionEvents_withRetainedEventsPhysicallyBeforeCompaction_includesThem() { + // Simulating the user's specific case with retention size 1: + // "event1, event2, event3, compaction1-2 ... event3 is retained so it is before compaction + // event" + // + // Timeline: + // T=1: E1 + // T=2: E2 + // T=3: E3 + // T=4: C1 (Covers T=1 to T=2). + // + // Note: C1 was inserted *after* E3 in the list. + // List order: E1, E2, E3, C1. + // + // If we have more events: + // T=5: E5 + // T=6: E6 + // + // Retained: E6. + // Summary Input: C1, E3, E5. (E1, E2 covered by C1). + ImmutableList events = + ImmutableList.of( + createEvent(1, "E1"), + createEvent(2, "E2"), + createEvent(3, "E3"), + createCompactedEvent( + /* startTimestamp= */ 1, /* endTimestamp= */ 2, "C1", /* eventTimestamp= */ 4), + createEvent(5, "E5"), + createEvent(6, "E6")); + + List result = TailRetentionEventCompactor.getCompactionEvents(events, 1).blockingGet(); + + assertThat(result).hasSize(3); + + // Check first event is reconstructed C1 + Event reconstructedC1 = result.get(0); + assertThat(getPromptText(reconstructedC1)).isEqualTo("C1"); + // Verify timestamp is reset to startTimestamp (1) + assertThat(reconstructedC1.timestamp()).isEqualTo(1); + + // Check second event is E3 + Event e3 = result.get(1); + assertThat(getPromptText(e3)).isEqualTo("E3"); + assertThat(e3.timestamp()).isEqualTo(3); + + // Check third event is E5 + Event e5 = result.get(2); + assertThat(getPromptText(e5)).isEqualTo("E5"); + assertThat(e5.timestamp()).isEqualTo(5); + } + + @Test + public void getCompactionEvents_withMultipleCompactionEvents_respectsCompactionBoundary() { + // T=1: E1 + // T=2: E2, retained by C1 + // T=3: E3, retained by C1 + // T=4: E4, retained by C1 and C2 + // T=5: C1 (Covers T=1) + // T=6: E6, retained by C2 + // T=7: E7, retained by C2 + // T=8: C2 (Covers T=1 to T=3) since it covers C1 which starts at T=1. + // T=9: E9 + + // Retention = 3. + // Expected to summarize: C2, E4. (E1 covered by C1 - ignored, E2, E3 covered by C2). + // E6, E7, E9 are retained. + + ImmutableList events = + ImmutableList.of( + createEvent(1, "E1"), + createEvent(2, "E2"), + createEvent(3, "E3"), + createEvent(4, "E4"), + createCompactedEvent( + /* startTimestamp= */ 1, /* endTimestamp= */ 1, "C1", /* eventTimestamp= */ 5), + createEvent(6, "E6"), + createEvent(7, "E7"), + createCompactedEvent( + /* startTimestamp= */ 1, /* endTimestamp= */ 3, "C2", /* eventTimestamp= */ 8), + createEvent(9, "E9")); + + List result = TailRetentionEventCompactor.getCompactionEvents(events, 3).blockingGet(); + + assertThat(result).hasSize(2); + + // Check first event is reconstructed C2 + Event reconstructedC2 = result.get(0); + assertThat(getPromptText(reconstructedC2)).isEqualTo("C2"); + // Verify timestamp is reset to startTimestamp (1), not event timestamp (8) or end timestamp (3) + assertThat(reconstructedC2.timestamp()).isEqualTo(1); + + // Check second event is E4 + Event e4 = result.get(1); + assertThat(e4.timestamp()).isEqualTo(4); + } + + private static Event createEvent(long timestamp, String text) { + return Event.builder() + .timestamp(timestamp) + .content(Content.builder().parts(Part.fromText(text)).build()) + .build(); + } + + private static String getPromptText(Event event) { + return event + .content() + .flatMap(Content::parts) + .flatMap(parts -> parts.stream().findFirst()) + .flatMap(Part::text) + .orElseThrow(); + } + + private Event createCompactedEvent( + long startTimestamp, long endTimestamp, String content, long eventTimestamp) { + return Event.builder() + .timestamp(eventTimestamp) + .actions( + EventActions.builder() + .compaction( + EventCompaction.builder() + .startTimestamp(startTimestamp) + .endTimestamp(endTimestamp) + .compactedContent( + Content.builder() + .role("model") + .parts(Part.builder().text(content).build()) + .build()) + .build()) + .build()) + .build(); + } +}