Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions src/google/adk/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,9 +777,6 @@ async def _exec_with_plugin(

async with Aclosing(execute_fn(invocation_context)) as agen:
async for event in agen:
_apply_run_config_custom_metadata(
event, invocation_context.run_config
)
if is_live_call:
if event.partial and _is_transcription(event):
is_transcribing = True
Expand All @@ -806,14 +803,14 @@ async def _exec_with_plugin(
'Appending transcription finished event: %s', event
)
if self._should_append_event(event, is_live_call):
await self.session_service.append_event(
session=session, event=event
event = await self._append_event(
invocation_context, session, event
)

for buffered_event in buffered_events:
logger.debug('Appending buffered event: %s', buffered_event)
await self.session_service.append_event(
session=session, event=buffered_event
buffered_event = await self._append_event(
invocation_context, session, buffered_event
)
yield buffered_event # yield buffered events to caller
buffered_events = []
Expand All @@ -822,33 +819,43 @@ async def _exec_with_plugin(
# example, event that stores blob reference, should be appended.
if self._should_append_event(event, is_live_call):
logger.debug('Appending non-buffered event: %s', event)
await self.session_service.append_event(
session=session, event=event
event = await self._append_event(
invocation_context, session, event
)
else:
if event.partial is not True:
await self.session_service.append_event(
session=session, event=event
)
event = await self._append_event(invocation_context, session, event)
yield event
# Run after_run_callbacks to perform global cleanup tasks or finalizing logs and metrics data
# This does NOT emit any event.
await plugin_manager.run_after_run_callback(
invocation_context=invocation_context
)

# Step 3: Run the on_event callbacks to optionally modify the event.
modified_event = await plugin_manager.run_on_event_callback(
invocation_context=invocation_context, event=event
)
if modified_event:
_apply_run_config_custom_metadata(
modified_event, invocation_context.run_config
)
yield modified_event
else:
yield event
async def _append_event(
self,
invocation_context: InvocationContext,
session: Session,
event: Event,
) -> Event:
"""Appends an event to the session with plugin callbacks.

# Step 4: Run the after_run callbacks to perform global cleanup tasks or
# finalizing logs and metrics data.
# This does NOT emit any event.
await plugin_manager.run_after_run_callback(
invocation_context=invocation_context
Args:
invocation_context: The invocation context.
session: The session to append the event to.
event: The event to process and append to the session.

Returns:
The event after processing by plugins.
"""
plugin_manager = invocation_context.plugin_manager
modified_event = await plugin_manager.run_on_event_callback(
invocation_context=invocation_context, event=event
)
if modified_event is not None:
event = modified_event
_apply_run_config_custom_metadata(event, invocation_context.run_config)
await self.session_service.append_event(session=session, event=event)
return event

async def _append_new_message_to_session(
self,
Expand Down
Loading