Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Documentation = "https://docs.temporal.io/docs/python"
dev = [
"basedpyright==1.34.0",
"cibuildwheel>=2.22.0,<3",
"google-adk @ git+https://github.com/marcusmotill/adk-python-temporal.git@motill/durable-support",
"grpcio-tools>=1.48.2,<2",
"mypy==1.18.2",
"mypy-protobuf>=3.3.0,<4",
Expand Down
81 changes: 81 additions & 0 deletions temporalio/contrib/google_adk_agents/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Google ADK Agents SDK Integration for Temporal

This package provides the integration layer between the Google ADK and Temporal. It allows ADK Agents to run reliably within Temporal Workflows by ensuring determinism and correctly routing external calls (network I/O) through Temporal Activities.

## Core Concepts

### 1. Interception Flow (`AgentPlugin`)

The `AgentPlugin` acts as a middleware that intercepts model calls (e.g., `agent.generate_content`) *before* they execute.

**Workflow Interception:**
1. **Intercept**: The ADK invokes `before_model_callback` when an agent attempts to call a model.
2. **Delegate**: The plugin calls `workflow.execute_activity()`, routing the request to Temporal for execution.
3. **Return**: The plugin awaits the activity result and returns it immediately. The ADK stops its own request processing, using the activity result as the final response.

This ensures that all model interactions are recorded in the Temporal Workflow history, enabling reliable replay and determinism.

### 2. Dynamic Activity Registration

To provide visibility in the Temporal UI, activities are dynamically named after the calling agent (e.g., `MyAgent.generate_content`). Since agent names are not known at startup, the integration uses Temporal's dynamic activity registration.

```python
@activity.defn(dynamic=True)
async def dynamic_activity(args: Sequence[RawValue]) -> Any:
...
```

When the workflow executes an activity with an unknown name (e.g., `MyAgent.generate_content`), the worker routes the call to `dynamic_activity`. This handler inspects the `activity_type` and delegates execution to the appropriate internal logic (`_handle_generate_content`), enabling arbitrary activity names without explicit registration.

### 3. Usage & Configuration

The integration requires setup on both the Agent (Workflow) side and the Worker side.

#### Agent Setup (Workflow Side)
Attach the `AgentPlugin` to your ADK agent. This safely routes model calls through Temporal activities. You **must** provide activity options (e.g., timeouts) as there are no defaults.

```python
from datetime import timedelta
from temporalio.common import RetryPolicy
from google.adk.integrations.temporal import AgentPlugin

# 1. Define Temporal Activity Options
activity_options = {
"start_to_close_timeout": timedelta(minutes=1),
"retry_policy": RetryPolicy(maximum_attempts=3)
}

# 2. Add Plugin to Agent
agent = Agent(
model="gemini-2.5-pro",
plugins=[
# Routes model calls to Temporal Activities
AgentPlugin(activity_options=activity_options)
]
)

# 3. Use Agent in Workflow
# When agent.generate_content() is called, it will execute as a Temporal Activity.
```

#### Worker Setup
Install the `WorkerPlugin` on your Temporal Worker. This handles serialization and runtime determinism.

```python
from temporalio.worker import Worker
from google.adk.integrations.temporal import WorkerPlugin

async def main():
worker = Worker(
client,
task_queue="my-queue",
# Configures ADK Runtime & Pydantic Support
plugins=[WorkerPlugin()]
)
await worker.run()
```

**What `WorkerPlugin` Does:**
* **Data Converter**: Enables Pydantic serialization for ADK objects.
* **Interceptors**: Sets up specific ADK runtime hooks for determinism (replacing `time.time`, `uuid.uuid4`) before workflow execution.
* TODO: is this enough . **Unsandboxed Workflow Runner**: Configures the worker to use the `UnsandboxedWorkflowRunner`, allowing standard imports in ADK agents.
20 changes: 20 additions & 0 deletions temporalio/contrib/google_adk_agents/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Temporal Integration for ADK.

This module provides the necessary components to run ADK Agents within Temporal Workflows.
"""

from temporalio.contrib.google_adk_agents._mcp import (
TemporalMcpToolSet,
TemporalMcpToolSetProvider,
)
from temporalio.contrib.google_adk_agents._plugin import (
AdkAgentPlugin,
TemporalAdkPlugin,
)

__all__ = [
"AdkAgentPlugin",
"TemporalAdkPlugin",
"TemporalMcpToolSet",
"TemporalMcpToolSetProvider",
]
251 changes: 251 additions & 0 deletions temporalio/contrib/google_adk_agents/_mcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Callable

from google.adk.agents.readonly_context import ReadonlyContext
from google.adk.events import EventActions
from google.adk.tools.base_tool import BaseTool
from google.adk.tools.base_toolset import BaseToolset
from google.adk.tools.mcp_tool import McpToolset
from google.adk.tools.tool_confirmation import ToolConfirmation
from google.adk.tools.tool_context import ToolContext
from google.genai import types
from google.genai.types import FunctionDeclaration

from temporalio import activity, workflow
from temporalio.exceptions import ApplicationError
from temporalio.workflow import ActivityConfig


@dataclass
class _GetToolsArguments:
factory_argument: Any | None


@dataclass
class _ToolResult:
name: str
description: str
is_long_running: bool
custom_metadata: dict[str, Any] | None
function_declaration: FunctionDeclaration | None


@dataclass
class TemporalToolContext:
"""Context for tools running within Temporal workflows.

Provides access to tool confirmation and event actions for ADK integration.
"""
tool_confirmation: ToolConfirmation | None
function_call_id: str | None
event_actions: EventActions

def request_confirmation(
self,
*,
hint: str | None = None,
payload: Any | None = None,
) -> None:
"""Requests confirmation for the given function call.

Args:
hint: A hint to the user on how to confirm the tool call.
payload: The payload used to confirm the tool call.
"""
if not self.function_call_id:
raise ValueError("function_call_id is not set.")
self.event_actions.requested_tool_confirmations[self.function_call_id] = (
ToolConfirmation(
hint=hint or "",
payload=payload,
)
)


@dataclass
class _CallToolResult:
result: Any
tool_context: TemporalToolContext


@dataclass
class _CallToolArguments:
factory_argument: Any | None
name: str
arguments: dict[str, Any]
tool_context: TemporalToolContext


class TemporalMcpToolSetProvider:
"""Provider for creating Temporal-aware MCP toolsets.

Manages the creation of toolset activities and handles tool execution
within Temporal workflows.
"""
def __init__(self, name: str, toolset_factory: Callable[[Any | None], McpToolset]):
"""Initializes the toolset provider.

Args:
name: Name prefix for the generated activities.
toolset_factory: Factory function that creates McpToolset instances.
"""
super().__init__()
self._name = name
self._toolset_factory = toolset_factory

def _get_activities(self) -> Sequence[Callable]:
@activity.defn(name=self._name + "-list-tools")
async def get_tools(
args: _GetToolsArguments,
) -> list[_ToolResult]:
toolset = self._toolset_factory(args.factory_argument)
tools = await toolset.get_tools()
return [
_ToolResult(
tool.name,
tool.description,
tool.is_long_running,
tool.custom_metadata,
tool._get_declaration(),
)
for tool in tools
]

@activity.defn(name=self._name + "-call-tool")
async def call_tool(
args: _CallToolArguments,
) -> _CallToolResult:
toolset = self._toolset_factory(args.factory_argument)
tools = await toolset.get_tools()
tool_match = [tool for tool in tools if tool.name == args.name]
if len(tool_match) == 0:
raise ApplicationError(
f"Unable to find matching mcp tool by name: {args.name}"
)
if len(tool_match) > 1:
raise ApplicationError(
f"Unable too many matching mcp tools by name: {args.name}"
)
tool = tool_match[0]

# We cannot provide a full-fledged ToolContext so we need to provide only what is needed by the tool
result = await tool.run_async(
args=args.arguments,
tool_context=args.tool_context, # type:ignore
)
return _CallToolResult(result=result, tool_context=args.tool_context)

return get_tools, call_tool


class _TemporalTool(BaseTool):
def __init__(
self,
set_name: str,
factory_argument: Any | None,
config: ActivityConfig | None,
declaration: FunctionDeclaration | None,
*,
name: str,
description: str,
is_long_running: bool = False,
custom_metadata: dict[str, Any] | None = None,
):
super().__init__(
name=name,
description=description,
is_long_running=is_long_running,
custom_metadata=custom_metadata,
)
self._set_name = set_name
self._factory_argument = factory_argument
self._config = config or ActivityConfig(
start_to_close_timeout=timedelta(minutes=1)
)
self._declaration = declaration

def _get_declaration(self) -> types.FunctionDeclaration | None:
return self._declaration

async def run_async(
self, *, args: dict[str, Any], tool_context: ToolContext
) -> Any:
result: _CallToolResult = await workflow.execute_activity(
self._set_name + "-call-tool",
_CallToolArguments(
self._factory_argument,
self.name,
arguments=args,
tool_context=TemporalToolContext(
tool_confirmation=tool_context.tool_confirmation,
function_call_id=tool_context.function_call_id,
event_actions=tool_context._event_actions,
),
),
result_type=_CallToolResult,
**self._config,
)

# We need to propagate any event actions back to the main context
tool_context._event_actions = result.tool_context.event_actions
return result.result


class TemporalMcpToolSet(BaseToolset):
"""Temporal-aware MCP toolset implementation.

Executes MCP tools as Temporal activities, providing proper isolation
and execution guarantees within workflows.
"""
def __init__(
self,
name: str,
config: ActivityConfig | None = None,
factory_argument: Any | None = None,
):
"""Initializes the Temporal MCP toolset.

Args:
name: Name of the toolset (used for activity naming).
config: Optional activity configuration.
factory_argument: Optional argument passed to toolset factory.
"""
super().__init__()
self._name = name
self._factory_argument = factory_argument
self._config = config or ActivityConfig(
start_to_close_timeout=timedelta(minutes=1)
)

async def get_tools(
self, readonly_context: ReadonlyContext | None = None
) -> list[BaseTool]:
"""Retrieves available tools from the MCP toolset.

Args:
readonly_context: Optional readonly context (unused in this implementation).

Returns:
List of available tools wrapped as Temporal activities.
"""
tool_results: list[_ToolResult] = await workflow.execute_activity(
self._name + "-list-tools",
_GetToolsArguments(self._factory_argument),
result_type=list[_ToolResult],
**self._config,
)
return [
_TemporalTool(
set_name=self._name,
factory_argument=self._factory_argument,
config=self._config,
declaration=tool_result.function_declaration,
name=tool_result.name,
description=tool_result.description,
is_long_running=tool_result.is_long_running,
custom_metadata=tool_result.custom_metadata,
)
for tool_result in tool_results
]
Loading