Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.adk.summarizer;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.adk.events.Event;
import com.google.adk.events.EventCompaction;
import com.google.adk.sessions.BaseSessionService;
import com.google.adk.sessions.Session;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class performs event compaction by retaining the tail of the event stream.
*
* <ul>
* <li>Keeps the {@code retentionSize} most recent events raw.
* <li>Compacts all events that never compacted and older than the retained tail, including the
* most recent compaction event, into a new summary event.
* <li>The new summary event is generated by the {@link BaseEventSummarizer}.
* <li>Appends this new summary event to the end of the event stream.
* </ul>
*
* <p>This compactor produces a rolling summary. Each new compaction event includes the content of
* the previous compaction event (if any) along with new events, effectively superseding all prior
* compactions.
*/
public final class TailRetentionEventCompactor implements EventCompactor {

private static final Logger logger = LoggerFactory.getLogger(TailRetentionEventCompactor.class);

private final BaseEventSummarizer summarizer;
private final int retentionSize;

public TailRetentionEventCompactor(BaseEventSummarizer summarizer, int retentionSize) {
this.summarizer = summarizer;
this.retentionSize = retentionSize;
}

@Override
public Completable compact(Session session, BaseSessionService sessionService) {
checkArgument(summarizer != null, "Missing BaseEventSummarizer for event compaction");
logger.debug("Running tail retention event compaction for session {}", session.id());

return Completable.fromMaybe(
getCompactionEvents(session.events(), retentionSize)
.flatMap(summarizer::summarizeEvents)
.flatMapSingle(e -> sessionService.appendEvent(session, e)));
}

/**
* Identifies events to be compacted based on the tail retention strategy.
*
* <p>This method iterates backwards through the event list to find the most recent compaction
* event (if any) and collects all uncompacted events that occurred after the range covered by
* that compaction. It then applies the retention policy, excluding the most recent {@code
* retentionSize} events from being compacted.
*
* <p><b>Basic Scenario:</b>
*
* <ul>
* <li>Events: E1, E2, E3, E4, E5 (Chronological order)
* <li>Retention Size: 2
* <li>Action: Compaction is triggered. The compactor identifies E1, E2, and E3 as eligible
* since E4, E5 need to be retained.
* <li>Result: E1, E2, E3 are compacted into C1.
* <li>Event stream after compaction: E1, E2, E3, E4, E5, C1. (Compaction event is appended in
* the end.)
* </ul>
*
* <p><b>Advanced Scenario (Handling Gaps):</b>
*
* <p>Consider an edge case where retention size is 3. Event E4 appears before the last compaction
* event (C2) and even the one prior (C1), but remains uncompacted and must be included in the
* third compaction (C3).
*
* <ul>
* <li>T=1: E1
* <li>T=2: E2
* <li>T=3: E3
* <li>T=4: E4
* <li>T=5: C1 (Covers T=1). Generated when getCompactionEvents returned <i>List: E1</i>. E2,
* E3, E4 were preserved.
* <li>T=6: E6
* <li>T=7: E7
* <li>T=8: C2 (Covers T=1 to T=3; starts at T=1 because it includes C1). Generated when
* getCompactionEvents returned <i>List: C1, E2, E3</i>. E4, E6, E7 were preserved.
* <li>T=9: E9.
* </ul>
*
* <p><b>Execution with Retention = 3:</b>
*
* <ol>
* <li>The method scans backward: E9, C2, E7, E6, C1, E4...
* <li><b>C2</b> is identified as the most recent compaction event (end timestamp T=3).
* <li><b>E9, E7, E6</b> are collected as they are newer than T=3.
* <li><b>C1</b> is ignored as we only care about the boundary set by the latest compaction.
* <li><b>E4</b> (T=4) is collected because it is newer than T=3.
* <li>Scanning stops at E3 as it is covered by C2 (timestamp <= T=3).
* <li>The initial list of events to summarize: <b>[E9, E7, E6, E4]</b>.
* <li>After appending the compaction event C2, the list becomes: <b>[E9, E7, E6, E4, C2]</b>
* <li>Reversing the list: <b>[C2, E4, E6, E7, E9]</b>.
* <li>Applying retention (keep last 3): <b>E6, E7, E9</b> are removed from the summary list.
* <li><b>Final Output:</b> {@code [C2, E4]}. E4 and the previous summary C2 will be compacted
* together. The new compaction event will cover the range from the start of the included
* compaction event (C2, T=1) to the end of the new events (E4, T=4).
* </ol>
*/
static Maybe<List<Event>> getCompactionEvents(List<Event> events, int retentionSize) {
long compactionEndTimestamp = Long.MIN_VALUE;
Event lastCompactionEvent = null;
List<Event> eventsToSummarize = new ArrayList<>();

// Iterate backwards from the end of the window to summarize.
// We use a single loop to:
// 1. Collect all raw events that happened after the latest compaction.
// 2. Identify the latest compaction event to establish the stop condition (boundary).
ListIterator<Event> iter = events.listIterator(events.size());
while (iter.hasPrevious()) {
Event event = iter.previous();

if (!isCompactEvent(event)) {
// Only include events that are strictly after the last compaction range.
if (event.timestamp() > compactionEndTimestamp) {
eventsToSummarize.add(event);
continue;
} else {
// Exit early if we have reached the last event of last compaction range.
break;
}
}

EventCompaction compaction = event.actions().compaction().orElse(null);
// We use the most recent compaction event to define the time boundary. Any subsequent (older)
// compaction events are ignored.
if (lastCompactionEvent == null) {
compactionEndTimestamp = compaction.endTimestamp();
lastCompactionEvent = event;
}
}

// If there are not enough events to summarize, we can return early.
if (eventsToSummarize.size() <= retentionSize) {
return Maybe.empty();
}

// Add the last compaction event to the list of events to summarize.
// This is to ensure that the last compaction event is included in the summary.
if (lastCompactionEvent != null) {
EventCompaction compaction = lastCompactionEvent.actions().compaction().get();
eventsToSummarize.add(
lastCompactionEvent.toBuilder()
.content(compaction.compactedContent())
// Use the start timestamp so that the new summary covers the entire range.
.timestamp(compaction.startTimestamp())
.build());
}

Collections.reverse(eventsToSummarize);

// Apply retention: keep the most recent 'retentionSize' events out of the summary.
// We do this by removing them from the list of events to be summarized.
eventsToSummarize
.subList(eventsToSummarize.size() - retentionSize, eventsToSummarize.size())
.clear();
return Maybe.just(eventsToSummarize);
}

private static boolean isCompactEvent(Event event) {
return event.actions() != null && event.actions().compaction().isPresent();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.adk.summarizer;

import static com.google.common.truth.Truth.assertThat;

import com.google.adk.events.Event;
import com.google.adk.events.EventActions;
import com.google.adk.events.EventCompaction;
import com.google.common.collect.ImmutableList;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class TailRetentionEventCompactorTest {

@Test
public void getCompactionEvents_notEnoughEvents_returnsEmpty() {
ImmutableList<Event> events =
ImmutableList.of(
createEvent(1, "Event1"), createEvent(2, "Event2"), createEvent(3, "Event3"));

// Retention size 5 > 3 events
TailRetentionEventCompactor.getCompactionEvents(events, 5)
.test()
.assertNoValues()
.assertComplete();
}

@Test
public void getCompactionEvents_respectRetentionSize() {
// Retention size is 2.
ImmutableList<Event> events =
ImmutableList.of(
createEvent(1, "Event1"), createEvent(2, "Retain1"), createEvent(3, "Retain2"));

List<Event> result = TailRetentionEventCompactor.getCompactionEvents(events, 2).blockingGet();

assertThat(result).hasSize(1);
assertThat(getPromptText(result.get(0))).isEqualTo("Event1");
}

@Test
public void getCompactionEvents_withRetainedEventsPhysicallyBeforeCompaction_includesThem() {
// Simulating the user's specific case with retention size 1:
// "event1, event2, event3, compaction1-2 ... event3 is retained so it is before compaction
// event"
//
// Timeline:
// T=1: E1
// T=2: E2
// T=3: E3
// T=4: C1 (Covers T=1 to T=2).
//
// Note: C1 was inserted *after* E3 in the list.
// List order: E1, E2, E3, C1.
//
// If we have more events:
// T=5: E5
// T=6: E6
//
// Retained: E6.
// Summary Input: C1, E3, E5. (E1, E2 covered by C1).
ImmutableList<Event> events =
ImmutableList.of(
createEvent(1, "E1"),
createEvent(2, "E2"),
createEvent(3, "E3"),
createCompactedEvent(
/* startTimestamp= */ 1, /* endTimestamp= */ 2, "C1", /* eventTimestamp= */ 4),
createEvent(5, "E5"),
createEvent(6, "E6"));

List<Event> result = TailRetentionEventCompactor.getCompactionEvents(events, 1).blockingGet();

assertThat(result).hasSize(3);

// Check first event is reconstructed C1
Event reconstructedC1 = result.get(0);
assertThat(getPromptText(reconstructedC1)).isEqualTo("C1");
// Verify timestamp is reset to startTimestamp (1)
assertThat(reconstructedC1.timestamp()).isEqualTo(1);

// Check second event is E3
Event e3 = result.get(1);
assertThat(getPromptText(e3)).isEqualTo("E3");
assertThat(e3.timestamp()).isEqualTo(3);

// Check third event is E5
Event e5 = result.get(2);
assertThat(getPromptText(e5)).isEqualTo("E5");
assertThat(e5.timestamp()).isEqualTo(5);
}

@Test
public void getCompactionEvents_withMultipleCompactionEvents_respectsCompactionBoundary() {
// T=1: E1
// T=2: E2, retained by C1
// T=3: E3, retained by C1
// T=4: E4, retained by C1 and C2
// T=5: C1 (Covers T=1)
// T=6: E6, retained by C2
// T=7: E7, retained by C2
// T=8: C2 (Covers T=1 to T=3) since it covers C1 which starts at T=1.
// T=9: E9

// Retention = 3.
// Expected to summarize: C2, E4. (E1 covered by C1 - ignored, E2, E3 covered by C2).
// E6, E7, E9 are retained.

ImmutableList<Event> events =
ImmutableList.of(
createEvent(1, "E1"),
createEvent(2, "E2"),
createEvent(3, "E3"),
createEvent(4, "E4"),
createCompactedEvent(
/* startTimestamp= */ 1, /* endTimestamp= */ 1, "C1", /* eventTimestamp= */ 5),
createEvent(6, "E6"),
createEvent(7, "E7"),
createCompactedEvent(
/* startTimestamp= */ 1, /* endTimestamp= */ 3, "C2", /* eventTimestamp= */ 8),
createEvent(9, "E9"));

List<Event> result = TailRetentionEventCompactor.getCompactionEvents(events, 3).blockingGet();

assertThat(result).hasSize(2);

// Check first event is reconstructed C2
Event reconstructedC2 = result.get(0);
assertThat(getPromptText(reconstructedC2)).isEqualTo("C2");
// Verify timestamp is reset to startTimestamp (1), not event timestamp (8) or end timestamp (3)
assertThat(reconstructedC2.timestamp()).isEqualTo(1);

// Check second event is E4
Event e4 = result.get(1);
assertThat(e4.timestamp()).isEqualTo(4);
}

private static Event createEvent(long timestamp, String text) {
return Event.builder()
.timestamp(timestamp)
.content(Content.builder().parts(Part.fromText(text)).build())
.build();
}

private static String getPromptText(Event event) {
return event
.content()
.flatMap(Content::parts)
.flatMap(parts -> parts.stream().findFirst())
.flatMap(Part::text)
.orElseThrow();
}

private Event createCompactedEvent(
long startTimestamp, long endTimestamp, String content, long eventTimestamp) {
return Event.builder()
.timestamp(eventTimestamp)
.actions(
EventActions.builder()
.compaction(
EventCompaction.builder()
.startTimestamp(startTimestamp)
.endTimestamp(endTimestamp)
.compactedContent(
Content.builder()
.role("model")
.parts(Part.builder().text(content).build())
.build())
.build())
.build())
.build();
}
}