Skip to content

Commit bac6922

Browse files
committed
fix: deduplicate STATE_DELTA logging and add log_state_changes toggle
- Remove inline STATE_DELTA logging from after_tool_callback to avoid duplicate rows (on_state_change_callback is the canonical path) - Add event ID dedup guard in Runner._exec_with_plugin to prevent the same event from triggering on_state_change_callback twice - Add log_state_changes config flag (default False) to BigQueryLoggerConfig for explicit opt-in to STATE_DELTA logging - Update tests to verify dedup, toggle on, and toggle off behavior
1 parent 87c46a7 commit bac6922

3 files changed

Lines changed: 39 additions & 33 deletions

File tree

src/google/adk/plugins/bigquery_agent_analytics_plugin.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ class BigQueryLoggerConfig:
410410
# Format: "location.connection_id" (e.g. "us.my-connection")
411411
connection_id: Optional[str] = None
412412

413+
# Toggle for state change (STATE_DELTA) logging via on_state_change_callback
414+
log_state_changes: bool = False
413415
# Toggle for session metadata (e.g. gchat thread-id)
414416
log_session_metadata: bool = True
415417
# Static custom tags (e.g. {"agent_role": "sales"})
@@ -2183,6 +2185,8 @@ async def on_state_change_callback(
21832185
state_delta: The change in state to log.
21842186
**kwargs: Additional arguments.
21852187
"""
2188+
if not self.config.log_state_changes:
2189+
return
21862190
await self._log_event(
21872191
"STATE_DELTA",
21882192
callback_context,
@@ -2510,13 +2514,6 @@ async def after_tool_callback(
25102514
parent_span_id_override=parent_span_id,
25112515
)
25122516

2513-
if tool_context.actions.state_delta:
2514-
await self._log_event(
2515-
"STATE_DELTA",
2516-
tool_context,
2517-
state_delta=tool_context.actions.state_delta,
2518-
)
2519-
25202517
async def on_tool_error_callback(
25212518
self,
25222519
*,

src/google/adk/runners.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,7 @@ async def _exec_with_plugin(
774774
# transcription event.
775775
buffered_events: list[Event] = []
776776
is_transcribing: bool = False
777+
notified_state_change_event_ids: set[str] = set()
777778

778779
async with Aclosing(execute_fn(invocation_context)) as agen:
779780
async for event in agen:
@@ -845,7 +846,11 @@ async def _exec_with_plugin(
845846
yield final_event
846847

847848
# Step 3b: Notify plugins of state changes, if any.
848-
if final_event.actions.state_delta:
849+
if (
850+
final_event.actions.state_delta
851+
and final_event.id not in notified_state_change_event_ids
852+
):
853+
notified_state_change_event_ids.add(final_event.id)
849854
from .agents.callback_context import CallbackContext
850855

851856
await plugin_manager.run_on_state_change_callback(

tests/unittests/plugins/test_bigquery_agent_analytics_plugin.py

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,9 +1560,14 @@ async def test_after_tool_callback_logs_correctly(
15601560
assert content_dict["result"] == {"res": "success"}
15611561

15621562
@pytest.mark.asyncio
1563-
async def test_after_tool_callback_state_delta_logging(
1563+
async def test_after_tool_callback_state_delta_not_logged_inline(
15641564
self, bq_plugin_inst, mock_write_client, tool_context, dummy_arrow_schema
15651565
):
1566+
"""STATE_DELTA is not logged inline by after_tool_callback.
1567+
1568+
State changes are logged exclusively via on_state_change_callback to
1569+
avoid duplicate STATE_DELTA rows.
1570+
"""
15661571
mock_tool = mock.create_autospec(
15671572
base_tool_lib.BaseTool, instance=True, spec_set=True
15681573
)
@@ -1581,31 +1586,12 @@ async def test_after_tool_callback_state_delta_logging(
15811586
)
15821587
await asyncio.sleep(0.01)
15831588

1584-
# We should have two events appended: TOOL_COMPLETED and STATE_DELTA
1585-
assert mock_write_client.append_rows.call_count >= 1
1586-
1587-
# Retrieve all flushed events
1588-
rows = await _get_captured_rows_async(mock_write_client, dummy_arrow_schema)
1589-
assert len(rows) == 2
1590-
1591-
# Sort by event_type to reliably access them
1592-
rows.sort(key=lambda x: x["event_type"])
1593-
1594-
state_delta_event = (
1595-
rows[0] if rows[0]["event_type"] == "STATE_DELTA" else rows[1]
1596-
)
1597-
tool_event = (
1598-
rows[1] if rows[1]["event_type"] == "TOOL_COMPLETED" else rows[0]
1589+
# Only TOOL_COMPLETED should be logged; STATE_DELTA is handled
1590+
# exclusively by on_state_change_callback.
1591+
log_entry = await _get_captured_event_dict_async(
1592+
mock_write_client, dummy_arrow_schema
15991593
)
1600-
1601-
assert state_delta_event["event_type"] == "STATE_DELTA"
1602-
assert tool_event["event_type"] == "TOOL_COMPLETED"
1603-
1604-
# Verify STATE_DELTA payload
1605-
attributes = json.loads(state_delta_event["attributes"])
1606-
assert "state_delta" in attributes
1607-
assert attributes["state_delta"] == {"new_key": "new_value"}
1608-
assert state_delta_event["content"] is None
1594+
assert log_entry["event_type"] == "TOOL_COMPLETED"
16091595

16101596
@pytest.mark.asyncio
16111597
async def test_on_state_change_callback_logs_correctly(
@@ -1615,6 +1601,7 @@ async def test_on_state_change_callback_logs_correctly(
16151601
callback_context,
16161602
dummy_arrow_schema,
16171603
):
1604+
bq_plugin_inst.config.log_state_changes = True
16181605
state_delta = {"key": "value", "new_key": 123}
16191606
bigquery_agent_analytics_plugin.TraceManager.push_span(callback_context)
16201607
await bq_plugin_inst.on_state_change_callback(
@@ -1632,6 +1619,23 @@ async def test_on_state_change_callback_logs_correctly(
16321619
attributes = json.loads(log_entry["attributes"])
16331620
assert attributes["state_delta"] == state_delta
16341621

1622+
@pytest.mark.asyncio
1623+
async def test_on_state_change_callback_disabled(
1624+
self,
1625+
bq_plugin_inst,
1626+
mock_write_client,
1627+
callback_context,
1628+
):
1629+
"""Test that no event is logged when log_state_changes is False."""
1630+
bq_plugin_inst.config.log_state_changes = False
1631+
state_delta = {"key": "value", "new_key": 123}
1632+
bigquery_agent_analytics_plugin.TraceManager.push_span(callback_context)
1633+
await bq_plugin_inst.on_state_change_callback(
1634+
callback_context=callback_context, state_delta=state_delta
1635+
)
1636+
await asyncio.sleep(0.01)
1637+
mock_write_client.append_rows.assert_not_called()
1638+
16351639
@pytest.mark.asyncio
16361640
async def test_log_event_with_session_metadata(
16371641
self,

0 commit comments

Comments
 (0)