Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 118 additions & 37 deletions core/src/main/java/com/google/adk/flows/llmflows/Contents.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -175,17 +174,26 @@ private boolean isEmptyContent(Event event) {

/**
* Filters events that are covered by compaction events by identifying compacted ranges and
* filters out events that are covered by compaction summaries
* filters out events that are covered by compaction summaries. Also filters out redundant
* compaction events (i.e., those fully covered by a later compaction event).
*
* <p>Example of input
* <p>Compaction events are inserted into the stream relative to the events they cover.
* Specifically, a compaction event is placed immediately before the first retained event that
* follows the compaction range (or at the end of the covered range if no events are retained).
* This ensures a logical flow of "Summary of History" -> "Recent/Retained Events".
*
* <p><b>Case 1: Sliding Window + Retention</b>
*
* <p>Compaction events have some overlap but do not fully cover each other. Therefore, all
* compaction events are preserved, as well as the final retained events.
*
* <pre>
* [
* event_1(timestamp=1),
* event_2(timestamp=2),
* compaction_1(event_1, event_2, timestamp=3, content=summary_1_2, startTime=1, endTime=2),
* event_3(timestamp=4),
* compaction_2(event_2, event_3, timestamp=5, content=summary_2_3, startTime=2, endTime=3),
* compaction_2(event_2, event_3, timestamp=5, content=summary_2_3, startTime=2, endTime=4),
* event_4(timestamp=6)
* ]
* </pre>
Expand All @@ -200,50 +208,123 @@ private boolean isEmptyContent(Event event) {
* ]
* </pre>
*
* Compaction events are always strictly in order based on event timestamp.
* <p><b>Case 2: Rolling Summary + Retention</b>
*
* <p>The newer compaction event fully covers the older one. Therefore, the older compaction event
* is removed, leaving only the latest summary and the final retained events.
*
* <pre>
* [
* event_1(timestamp=1),
* event_2(timestamp=2),
* event_3(timestamp=3),
* event_4(timestamp=4),
* compaction_1(event_1, timestamp=5, content=summary_1, startTime=1, endTime=1),
* event_6(timestamp=6),
* event_7(timestamp=7),
* compaction_2(compaction_1, event_2, event_3, timestamp=8, content=summary_1_3, startTime=1, endTime=3),
* event_9(timestamp=9)
* ]
* </pre>
*
* Will result in the following events output
*
* <pre>
* [
* compaction_2,
* event_4,
* event_6,
* event_7,
* event_9
* ]
* </pre>
*
* @param events the list of event to filter.
* @return a new list with compaction applied.
*/
private List<Event> processCompactionEvent(List<Event> events) {
// Step 1: Split events into compaction events and regular events.
List<Event> compactionEvents = new ArrayList<>();
List<Event> regularEvents = new ArrayList<>();
for (Event event : events) {
if (event.actions().compaction().isPresent()) {
compactionEvents.add(event);
} else {
regularEvents.add(event);
}
}

// Step 2: Remove redundant compaction events (overlapping ones).
compactionEvents = removeOverlappingCompactions(compactionEvents);

// Step 3: Merge regular events and compaction events based on timestamps.
// We iterate backwards from the latest to the earliest event.
List<Event> result = new ArrayList<>();
ListIterator<Event> iter = events.listIterator(events.size());
Long lastCompactionStartTime = null;
Long lastCompactionEndTime = null;

while (iter.hasPrevious()) {
Event event = iter.previous();
EventCompaction compaction = event.actions().compaction().orElse(null);
if (compaction == null) {
if (lastCompactionStartTime == null
|| event.timestamp() < lastCompactionStartTime
|| (lastCompactionEndTime != null && event.timestamp() > lastCompactionEndTime)) {
result.add(event);
}
continue;
int c = compactionEvents.size() - 1;
int e = regularEvents.size() - 1;
while (e >= 0 && c >= 0) {
Event event = regularEvents.get(e);
EventCompaction compaction = compactionEvents.get(c).actions().compaction().get();

if (event.timestamp() >= compaction.startTimestamp()
&& event.timestamp() <= compaction.endTimestamp()) {
// If the event is covered by compaction, skip it.
e--;
} else if (event.timestamp() > compaction.endTimestamp()) {
// If the event is after compaction, keep it.
result.add(event);
e--;
} else {
// Otherwise the event is before the compaction, let's move to the next compaction event;
result.add(createCompactionEvent(compactionEvents.get(c)));
c--;
}
// Create a new event for the compaction event in the result.
result.add(
Event.builder()
.timestamp(compaction.endTimestamp())
.author("model")
.content(compaction.compactedContent())
.branch(event.branch())
.invocationId(event.invocationId())
.actions(event.actions())
.build());
lastCompactionStartTime =
lastCompactionStartTime == null
? compaction.startTimestamp()
: Long.min(lastCompactionStartTime, compaction.startTimestamp());
lastCompactionEndTime =
lastCompactionEndTime == null
? compaction.endTimestamp()
: Long.max(lastCompactionEndTime, compaction.endTimestamp());
}
// Flush any remaining compactions.
while (c >= 0) {
result.add(createCompactionEvent(compactionEvents.get(c)));
c--;
}
// Flush any remaining regular events.
while (e >= 0) {
result.add(regularEvents.get(e));
e--;
}
return Lists.reverse(result);
}

private static List<Event> removeOverlappingCompactions(List<Event> events) {
List<Event> result = new ArrayList<>();
// Iterate backwards to prioritize later compactions
for (int i = events.size() - 1; i >= 0; i--) {
Event current = events.get(i);
EventCompaction c = current.actions().compaction().get();

// Check if this compaction is covered by any later compaction we've already kept
boolean covered =
result.stream()
.map(e -> e.actions().compaction().get())
.anyMatch(
k ->
c.startTimestamp() >= k.startTimestamp()
&& c.endTimestamp() <= k.endTimestamp());

if (!covered) {
result.add(current);
}
}
return Lists.reverse(result);
}

private static Event createCompactionEvent(Event event) {
EventCompaction compaction = event.actions().compaction().get();
return event.toBuilder()
.timestamp(compaction.endTimestamp())
.author("model")
.content(compaction.compactedContent())
.build();
}

/** Whether the event is a reply from another agent. */
private static boolean isOtherAgentReply(String agentName, Event event) {
return !agentName.isEmpty()
Expand Down
96 changes: 96 additions & 0 deletions core/src/test/java/com/google/adk/flows/llmflows/ContentsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,102 @@ public void processRequest_compactionWithUncompactedEventsBetween() {
.containsExactly("content 3", "Summary 1-2");
}

@Test
public void processRequest_rollingSummary_removesRedundancy() {
// Input: [E1(1), C1(Cover 1-1), E3(3), C2(Cover 1-3)]
// Expected: [C2] (C1 is redundant, E1/E3 are covered).
ImmutableList<Event> events =
ImmutableList.of(
createUserEvent("e1", "E1", "inv1", 1),
createCompactedEvent(1, 1, "C1"),
createUserEvent("e3", "E3", "inv3", 3),
createCompactedEvent(1, 3, "C2"));

List<Content> contents = runContentsProcessor(events);
assertThat(contents)
.comparingElementsUsing(
transforming((Content c) -> c.parts().get().get(0).text().get(), "content text"))
.containsExactly("C2");
}

@Test
public void processRequest_rollingSummaryWithRetention() {
// Input: with retention size 3: [E1, E2, E3, E4, C1(Cover 1-1), E6, E7, C2(Cover 1-3), E9]
// Expected: [C2, E4, E6, E7, E9]
ImmutableList<Event> events =
ImmutableList.of(
createUserEvent("e1", "E1", "inv1", 1),
createUserEvent("e2", "E2", "inv2", 2),
createUserEvent("e3", "E3", "inv3", 3),
createUserEvent("e4", "E4", "inv4", 4),
createCompactedEvent(1, 1, "C1"),
createUserEvent("e6", "E6", "inv6", 6),
createUserEvent("e7", "E7", "inv7", 7),
createCompactedEvent(1, 3, "C2"),
createUserEvent("e9", "E9", "inv9", 9));

List<Content> contents = runContentsProcessor(events);
assertThat(contents)
.comparingElementsUsing(
transforming((Content c) -> c.parts().get().get(0).text().get(), "content text"))
.containsExactly("C2", "E4", "E6", "E7", "E9");
}

@Test
public void processRequest_rollingSummary_preservesUncoveredHistory() {
// Input: [E1(1), E2(2), E3(3), E4(4), C1(2-2), E6(6), E7(7), C2(2-3), E9(9)]
// Expected: [E1, C2, E4, E6, E7, E9]
// E1 is before C1/C2 range, so it is preserved.
// C1 (2-2) is covered by C2 (2-3), so C1 is removed.
// E2, E3 are covered by C2.
// E4, E6, E7, E9 are retained.
ImmutableList<Event> events =
ImmutableList.of(
createUserEvent("e1", "E1", "inv1", 1),
createUserEvent("e2", "E2", "inv2", 2),
createUserEvent("e3", "E3", "inv3", 3),
createUserEvent("e4", "E4", "inv4", 4),
createCompactedEvent(2, 2, "C1"),
createUserEvent("e6", "E6", "inv6", 6),
createUserEvent("e7", "E7", "inv7", 7),
createCompactedEvent(2, 3, "C2"),
createUserEvent("e9", "E9", "inv9", 9));

List<Content> contents = runContentsProcessor(events);
assertThat(contents)
.comparingElementsUsing(
transforming((Content c) -> c.parts().get().get(0).text().get(), "content text"))
.containsExactly("E1", "C2", "E4", "E6", "E7", "E9");
}

@Test
public void processRequest_slidingWindow_preservesOverlappingCompactions() {
// Case 1: Sliding Window + Retention
// Input: [E1(1), E2(2), E3(3), C1(1-2), E4(5), C2(2-3), E5(7)]
// Overlap: C1 and C2 overlap at 2. C1 is NOT redundant (start 1 < start 2).
// Expected: [C1, C2, E4, E5]
// E1(1) covered by C1.
// E2(2) covered by C1 (and C2).
// E3(3) covered by C2.
// E4(5) retained.
// E5(7) retained.
ImmutableList<Event> events =
ImmutableList.of(
createUserEvent("e1", "E1", "inv1", 1),
createUserEvent("e2", "E2", "inv2", 2),
createUserEvent("e3", "E3", "inv3", 3),
createCompactedEvent(1, 2, "C1"),
createUserEvent("e4", "E4", "inv4", 5),
createCompactedEvent(2, 3, "C2"),
createUserEvent("e5", "E5", "inv5", 7));

List<Content> contents = runContentsProcessor(events);
assertThat(contents)
.comparingElementsUsing(
transforming((Content c) -> c.parts().get().get(0).text().get(), "content text"))
.containsExactly("C1", "C2", "E4", "E5");
}

private static Event createUserEvent(String id, String text) {
return Event.builder()
.id(id)
Expand Down