diff --git a/examples/demos/deep_research/README.md b/examples/demos/deep_research/README.md new file mode 100644 index 000000000..5412bfc07 --- /dev/null +++ b/examples/demos/deep_research/README.md @@ -0,0 +1,143 @@ +# Deep Research Multi-Agent System + +A multi-agent research system built on AgentEx that demonstrates **orchestrator + subagent communication** using Temporal workflows. An orchestrator agent dispatches specialized research subagents (GitHub, Docs, Slack) in parallel, collects their findings, and synthesizes a comprehensive answer. + +## Architecture + +``` + ┌─────────────────────┐ + │ Orchestrator │ + User ────▶│ (GPT-5.1) │ + Query │ Dispatches & │ + │ Synthesizes │ + └───┬─────┬─────┬─────┘ + │ │ │ + ┌─────────┘ │ └─────────┐ + ▼ ▼ ▼ + ┌────────────┐ ┌────────────┐ ┌────────────┐ + │ GitHub │ │ Docs │ │ Slack │ + │ Researcher │ │ Researcher │ │ Researcher │ + │ (GPT-4.1 │ │ (GPT-4.1 │ │ (GPT-4.1 │ + │ mini) │ │ mini) │ │ mini) │ + │ │ │ │ │ │ + │ GitHub MCP│ │ Web Search│ │ Slack MCP │ + │ Server │ │ + Fetcher │ │ Server │ + └────────────┘ └────────────┘ └────────────┘ +``` + +## Key Patterns Demonstrated + +### 1. Multi-Agent Orchestration via ACP +The orchestrator creates child tasks on subagents using `adk.acp.create_task()`, sends queries via `EVENT_SEND`, and waits for `research_complete` callback events. + +### 2. Shared Task ID for Unified Output +All subagents write messages to the **orchestrator's task ID** (passed as `source_task_id`), so the user sees all research progress in a single conversation thread. + +### 3. Conversation Compaction +Subagents use a batched `Runner.run()` pattern with conversation compaction between batches to stay within Temporal's ~2MB payload limit during long research sessions. + +### 4. MCP Server Integration +GitHub and Slack subagents use MCP (Model Context Protocol) servers via `StatelessMCPServerProvider` for tool access. + +## Agents + +| Agent | Port | Model | Tools | +|-------|------|-------|-------| +| Orchestrator | 8010 | gpt-5.1 | dispatch_github, dispatch_docs, dispatch_slack | +| GitHub Researcher | 8011 | gpt-4.1-mini | GitHub MCP (search_code, etc.) | +| Docs Researcher | 8012 | gpt-4.1-mini | web_search (Tavily), fetch_docs_page | +| Slack Researcher | 8013 | gpt-4.1-mini | Slack MCP (search_messages, etc.) | + +## Prerequisites + +- [AgentEx CLI](https://agentex.sgp.scale.com/docs/) installed +- OpenAI API key +- GitHub Personal Access Token (for GitHub researcher) +- Tavily API key (for Docs researcher) - get one at https://tavily.com +- Slack Bot Token (for Slack researcher) + +## Setup + +### 1. Environment Variables + +Create a `.env` file in each agent directory with the required keys: + +**orchestrator/.env:** +``` +OPENAI_API_KEY=your-openai-key +``` + +**github_researcher/.env:** +``` +OPENAI_API_KEY=your-openai-key +GITHUB_PERSONAL_ACCESS_TOKEN=your-github-token +``` + +**docs_researcher/.env:** +``` +OPENAI_API_KEY=your-openai-key +TAVILY_API_KEY=your-tavily-key +``` + +**slack_researcher/.env:** +``` +OPENAI_API_KEY=your-openai-key +SLACK_BOT_TOKEN=your-slack-bot-token +SLACK_TEAM_ID=your-slack-team-id +``` + +### 2. Run All Agents + +Start each agent in a separate terminal: + +```bash +# Terminal 1 - Orchestrator +cd orchestrator +agentex agents run --manifest manifest.yaml + +# Terminal 2 - GitHub Researcher +cd github_researcher +agentex agents run --manifest manifest.yaml + +# Terminal 3 - Docs Researcher +cd docs_researcher +agentex agents run --manifest manifest.yaml + +# Terminal 4 - Slack Researcher +cd slack_researcher +agentex agents run --manifest manifest.yaml +``` + +### 3. Test + +Open the AgentEx UI and send a research question to the orchestrator agent. You should see: +1. The orchestrator dispatching queries to subagents +2. Each subagent streaming its research progress to the same conversation +3. The orchestrator synthesizing all findings into a final answer + +## Customization + +### Using Different Research Sources + +You can adapt the subagents to search different sources: +- Replace the GitHub MCP server with any other MCP server +- Replace Tavily with your preferred search API +- Replace the Slack MCP with any communication platform's MCP +- Update the system prompts to match your target repositories, docs, and channels + +### Adding More Subagents + +To add a new research subagent: +1. Copy one of the existing subagent directories +2. Update the manifest.yaml with a new agent name and port +3. Modify the workflow.py system prompt and tools +4. Add a new dispatch tool in the orchestrator's workflow.py + +## How Shared Task ID Works + +The key pattern that makes all agents write to the same conversation: + +1. **Orchestrator** passes its `task_id` as `source_task_id` when creating child tasks +2. **Subagents** extract `parent_task_id = params.task.params.get("source_task_id")` +3. **Subagents** use `message_task_id = parent_task_id or params.task.id` for all `adk.messages.create()` calls and `TemporalStreamingHooks` +4. This means all messages and streamed LLM output appear in the orchestrator's task conversation diff --git a/examples/demos/deep_research/docs_researcher/.dockerignore b/examples/demos/deep_research/docs_researcher/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/demos/deep_research/docs_researcher/Dockerfile b/examples/demos/deep_research/docs_researcher/Dockerfile new file mode 100644 index 000000000..98bbba32a --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/Dockerfile @@ -0,0 +1,40 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + build-essential \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY deep_research/docs_researcher/pyproject.toml /app/docs_researcher/pyproject.toml + +WORKDIR /app/docs_researcher + +COPY deep_research/docs_researcher/project /app/docs_researcher/project + +RUN uv pip install --system . + +ENV PYTHONPATH=/app +ENV AGENT_NAME=deep-research-docs + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/demos/deep_research/docs_researcher/environments.yaml b/examples/demos/deep_research/docs_researcher/environments.yaml new file mode 100644 index 000000000..71fc1f08c --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/environments.yaml @@ -0,0 +1,27 @@ +# Agent Environment Configuration +schema_version: "v1" +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/demos/deep_research/docs_researcher/manifest.yaml b/examples/demos/deep_research/docs_researcher/manifest.yaml new file mode 100644 index 000000000..5ed500e7d --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/manifest.yaml @@ -0,0 +1,59 @@ +# Agent Manifest Configuration + +build: + context: + root: ../../ + include_paths: + - deep_research/docs_researcher + dockerfile: deep_research/docs_researcher/Dockerfile + dockerignore: deep_research/docs_researcher/.dockerignore + +local_development: + agent: + port: 8012 + host_address: host.docker.internal + + paths: + acp: project/acp.py + worker: project/run_worker.py + +agent: + acp_type: async + name: deep-research-docs + description: Searches documentation and the web for relevant guides and references + + temporal: + enabled: true + workflows: + - name: deep-research-docs + queue_name: deep_research_docs_queue + + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + + env: + OPENAI_ORG_ID: "" + +deployment: + image: + repository: "" + tag: "latest" + + imagePullSecrets: + - name: my-registry-secret + + global: + agent: + name: "deep-research-docs" + description: "Searches documentation and the web for relevant guides and references" + + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/demos/deep_research/docs_researcher/project/__init__.py b/examples/demos/deep_research/docs_researcher/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/demos/deep_research/docs_researcher/project/acp.py b/examples/demos/deep_research/docs_researcher/project/acp.py new file mode 100644 index 000000000..2b017f344 --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/project/acp.py @@ -0,0 +1,29 @@ +import os + +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.types.fastacp import TemporalACPConfig + +context_interceptor = ContextInterceptor() +streaming_model_provider = TemporalStreamingModelProvider() + +# Create the ACP server +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)], + interceptors=[context_interceptor], + ), +) + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(acp, host="0.0.0.0", port=8000) diff --git a/examples/demos/deep_research/docs_researcher/project/activities.py b/examples/demos/deep_research/docs_researcher/project/activities.py new file mode 100644 index 000000000..035a8eb12 --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/project/activities.py @@ -0,0 +1,110 @@ +"""Activity-based tools for docs research: web search and docs page fetcher.""" +import os +import re + +from temporalio import activity +import httpx + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + + +def _html_to_text(html: str) -> str: + """Basic HTML to text conversion.""" + text = re.sub(r"]*>.*?", "", html, flags=re.DOTALL) + text = re.sub(r"]*>.*?", "", text, flags=re.DOTALL) + text = re.sub(r"<[^>]+>", "\n", text) + text = re.sub(r" ", " ", text) + text = re.sub(r"&", "&", text) + text = re.sub(r"<", "<", text) + text = re.sub(r">", ">", text) + text = re.sub(r"'", "'", text) + text = re.sub(r""", '"', text) + text = re.sub(r"\n{3,}", "\n\n", text) + return text.strip() + + +@activity.defn +async def web_search(query: str) -> str: + """Search the web using Tavily and return results from multiple sources. + + Args: + query: The search query to look up on the web. + """ + api_key = os.environ.get("TAVILY_API_KEY", "") + if not api_key: + return "Error: TAVILY_API_KEY not set" + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + "https://api.tavily.com/search", + json={ + "api_key": api_key, + "query": query, + "search_depth": "advanced", + "include_answer": True, + "max_results": 5, + }, + ) + if response.status_code != 200: + return f"Search API error: {response.status_code}" + + data = response.json() + answer = data.get("answer", "") + + results = [] + for r in data.get("results", [])[:5]: + title = r.get("title", "") + content = r.get("content", "") + url = r.get("url", "") + results.append(f"**{title}**\n{content}\nSource: {url}") + + output = f"Search results for '{query}':\n\n" + if answer: + output += f"**Summary:** {answer}\n\n---\n\n" + output += "\n\n---\n\n".join(results) + return output + + except Exception as e: + return f"Error searching: {str(e)}" + + +@activity.defn +async def fetch_docs_page(url: str) -> str: + """Fetch and parse a documentation page. + + Args: + url: The full URL of the documentation page to fetch. Use one of: + https://agentex.sgp.scale.com/docs/... (official docs), + https://deepwiki.com/scaleapi/scale-agentex/... (platform DeepWiki), + https://deepwiki.com/scaleapi/scale-agentex-python/... (SDK DeepWiki) + """ + url = url.strip() + + # Update these allowed prefixes to match your documentation sources + allowed_prefixes = [ + "https://agentex.sgp.scale.com/", + "https://deepwiki.com/scaleapi/scale-agentex", + ] + if not any(url.startswith(prefix) for prefix in allowed_prefixes): + return f"Error: URL must be from an allowed documentation source. Got: {url}" + + try: + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: + response = await client.get(url) + if response.status_code != 200: + return f"Error fetching {url}: HTTP {response.status_code}" + + content_type = response.headers.get("content-type", "") + if "json" in content_type: + return response.text[:10000] + + text = _html_to_text(response.text) + if len(text) > 15000: + text = text[:15000] + "\n\n[... truncated, page is very long ...]" + return f"Content from {url}:\n\n{text}" + + except Exception as e: + return f"Error fetching {url}: {str(e)}" diff --git a/examples/demos/deep_research/docs_researcher/project/run_worker.py b/examples/demos/deep_research/docs_researcher/project/run_worker.py new file mode 100644 index 000000000..b610bd8a5 --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/project/run_worker.py @@ -0,0 +1,50 @@ +import asyncio + +from datetime import timedelta +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters + +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables + +from project.workflow import DocsResearchWorkflow +from project.activities import web_search, fetch_docs_page + +environment_variables = EnvironmentVariables.refresh() + +logger = make_logger(__name__) + + +async def main(): + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + all_activities = get_all_activities() + [web_search, fetch_docs_page] + + context_interceptor = ContextInterceptor() + + model_params = ModelActivityParameters( + start_to_close_timeout=timedelta(minutes=10), + ) + + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[ + OpenAIAgentsPlugin( + model_params=model_params, + ), + ], + interceptors=[context_interceptor], + ) + + await worker.run( + activities=all_activities, + workflow=DocsResearchWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/demos/deep_research/docs_researcher/project/summarization.py b/examples/demos/deep_research/docs_researcher/project/summarization.py new file mode 100644 index 000000000..a7215e595 --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/project/summarization.py @@ -0,0 +1,129 @@ +""" +Conversation compaction for research agents. + +Prevents Temporal payload size limit (~2MB) from being exceeded by compacting +old tool outputs between batch iterations of Runner.run(). +""" +from __future__ import annotations + +import json +from typing import Any, Dict, List, Optional + +from agents import Agent + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + +# Trigger compaction when serialized conversation exceeds this size (bytes). +# Temporal payload limit is ~2MB; we compact well before that. +COMPACTION_BYTE_THRESHOLD = 800_000 # 800 KB + +# Always keep the last N tool outputs in full (most recent context for the model). +KEEP_RECENT_OUTPUTS = 3 + +# Stub text that replaces truncated tool outputs. +TRUNCATED_STUB = "[Previous tool output truncated. Key findings were incorporated into the assistant's analysis.]" + + +def estimate_payload_size(input_list: List[Dict[str, Any]]) -> int: + """Estimate the serialized byte size of the conversation.""" + try: + return len(json.dumps(input_list, default=str)) + except Exception: + return sum(len(str(item)) for item in input_list) + + +def should_compact(input_list: List[Dict[str, Any]]) -> bool: + """Check if the conversation payload exceeds the compaction threshold.""" + size = estimate_payload_size(input_list) + logger.info("Conversation payload size: %d bytes", size) + return size > COMPACTION_BYTE_THRESHOLD + + +def compact_tool_outputs(input_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Replace old tool outputs with short stubs to reduce payload size. + + Keeps the most recent KEEP_RECENT_OUTPUTS tool outputs in full. + Older outputs are replaced with a truncation stub. + + Works with both Responses API format (function_call_output) and + Chat Completions format (role=tool). + """ + # Find indices of tool output items + output_indices = [] + for i, item in enumerate(input_list): + if not isinstance(item, dict): + continue + # Responses API format + if item.get("type") == "function_call_output": + output_indices.append(i) + # Chat Completions format + elif item.get("role") == "tool": + output_indices.append(i) + + if len(output_indices) <= KEEP_RECENT_OUTPUTS: + logger.info("Only %d tool outputs, no compaction needed", len(output_indices)) + return input_list + + # Truncate all but the most recent N outputs + indices_to_truncate = output_indices[:-KEEP_RECENT_OUTPUTS] + compacted = list(input_list) # shallow copy + + for idx in indices_to_truncate: + item = compacted[idx] + # Responses API format + if item.get("type") == "function_call_output": + output_val = item.get("output", "") + if len(str(output_val)) > 200: + compacted[idx] = {**item, "output": TRUNCATED_STUB} + # Chat Completions format + elif item.get("role") == "tool": + content_val = item.get("content", "") + if len(str(content_val)) > 200: + compacted[idx] = {**item, "content": TRUNCATED_STUB} + + before = estimate_payload_size(input_list) + after = estimate_payload_size(compacted) + logger.info("Compacted conversation: %d -> %d bytes (%d tool outputs truncated)", + before, after, len(indices_to_truncate)) + return compacted + + +def new_summarization_agent() -> Agent: + """Create a lightweight agent that summarizes research findings.""" + return Agent( + name="ResearchSummarizer", + instructions="""Summarize the research conversation concisely. Focus on: +- Key findings and code references discovered +- File paths, function names, and relevant snippets +- What questions were answered and what gaps remain +- Current state of the research + +Be comprehensive but concise (3-5 paragraphs). Focus on OUTCOMES, not listing every tool call.""", + model="gpt-4.1-mini", + tools=[], + ) + + +def find_last_summary_index(input_list: List[Dict[str, Any]]) -> Optional[int]: + """Find the index of the last summary message.""" + for i in range(len(input_list) - 1, -1, -1): + item = input_list[i] + if isinstance(item, dict) and item.get("_summary") is True: + return i + return None + + +def apply_summary_to_input_list( + input_list: List[Dict[str, Any]], + summary_text: str, + original_query: str, +) -> List[Dict[str, Any]]: + """Replace the conversation with a summary + resume instruction.""" + return [ + {"role": "user", "content": original_query}, + {"role": "assistant", "content": summary_text, "_summary": True}, + {"role": "user", "content": "Use the above summary of your previous research to continue. If you have enough information, provide your final synthesis. Otherwise, continue searching.", "_synthetic": True}, + ] diff --git a/examples/demos/deep_research/docs_researcher/project/workflow.py b/examples/demos/deep_research/docs_researcher/project/workflow.py new file mode 100644 index 000000000..964271056 --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/project/workflow.py @@ -0,0 +1,240 @@ +from __future__ import annotations + +import json +from datetime import timedelta +from typing import Any, Dict, List + +from agents import Agent, Runner +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.contrib import openai_agents + +from agentex.lib import adk +from agentex.lib.types.acp import CreateTaskParams, SendEventParams +from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers +from agentex.lib.core.temporal.activities.adk.acp.acp_activities import ( + ACPActivityName, + EventSendParams, +) +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks +from agentex.lib.utils.logging import make_logger +from agentex.types.event import Event +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from project.summarization import ( + should_compact, + compact_tool_outputs, + new_summarization_agent, + apply_summary_to_input_list, +) + +from project.activities import web_search, fetch_docs_page + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") + +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + +TOOL_TIMEOUT = timedelta(minutes=10) + +# Update these URLs to match your own documentation sources +SYSTEM_PROMPT = """You are a documentation research specialist. Search official documentation, DeepWiki pages, and the web to find relevant guides, API references, and explanations. + +You have two tools: +1. web_search - Search the web using Tavily for broad queries +2. fetch_docs_page - Fetch and read specific documentation pages + +KEY DOCUMENTATION SOURCES: +- Official AgentEx Docs: https://agentex.sgp.scale.com/docs/ +- DeepWiki (Platform): https://deepwiki.com/scaleapi/scale-agentex +- DeepWiki (Python SDK): https://deepwiki.com/scaleapi/scale-agentex-python + +RULES: +1. Start with web_search to find relevant doc pages +2. Fetch the most relevant pages with fetch_docs_page +3. After 5-6 tool calls, produce your final answer as plain text (no tool call) + +GUIDELINES: +- Check both official docs AND DeepWiki - they complement each other +- Track URLs for citations +- If you see a summary of previous research, build on it rather than repeating searches + +OUTPUT FORMAT - When done, write your findings with a **Sources** section at the end: + +**Findings:** +[Your analysis organized by topic] + +**Sources:** +For every piece of information, cite the source URL: +- [Page title](https://full-url) - brief description of what was found""" + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class DocsResearchWorkflow(BaseWorkflow): + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._research_result: str | None = None + self._task_id: str | None = None + self._trace_id: str | None = None + self._parent_span_id: str | None = None + self._input_list: List[Dict[str, Any]] = [] + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + logger.info("Docs researcher received event: %s", params) + self._task_id = params.task.id + self._trace_id = params.task.id + self._parent_span_id = params.task.id + + payload = self._extract_payload(params) + + parent_task_id = None + parent_agent_name = None + if params.task and getattr(params.task, "params", None): + parent_task_id = params.task.params.get("source_task_id") + parent_agent_name = params.task.params.get("parent_agent_name") + if not parent_task_id: + parent_task_id = payload.get("source_task_id") + if not parent_agent_name: + parent_agent_name = payload.get("parent_agent_name") + + # Write messages to the parent task so everything appears in one conversation + message_task_id = parent_task_id or params.task.id + + query = payload.get("query", payload.get("raw_content", "")) + if not query: + await adk.messages.create( + task_id=message_task_id, + content=TextContent(author="agent", content="No research query provided."), + ) + self._complete_task = True + return + + await adk.messages.create( + task_id=message_task_id, + content=TextContent(author="agent", content=f"Starting documentation research for: {query}"), + ) + + self._input_list.append({"role": "user", "content": query}) + + # Activity-based tools for web search and docs fetching + agent = Agent( + name="DocsResearcher", + instructions=SYSTEM_PROMPT, + model="gpt-4.1-mini", + tools=[ + openai_agents.workflow.activity_as_tool(web_search, start_to_close_timeout=TOOL_TIMEOUT, retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=10), backoff_coefficient=2.0)), + openai_agents.workflow.activity_as_tool(fetch_docs_page, start_to_close_timeout=TOOL_TIMEOUT, retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=10), backoff_coefficient=2.0)), + ], + ) + + hooks = TemporalStreamingHooks(task_id=message_task_id, timeout=timedelta(minutes=2)) + + TURNS_PER_BATCH = 7 + MAX_BATCHES = 5 + + for batch_num in range(MAX_BATCHES): + try: + result = await Runner.run(agent, self._input_list, hooks=hooks, max_turns=TURNS_PER_BATCH) + self._input_list = result.to_input_list() + + if result.final_output: + self._research_result = result.final_output + break + except Exception as e: + error_msg = str(e) + if "Max turns" in error_msg: + logger.warning("Docs batch %d hit max turns, attempting synthesis", batch_num) + try: + synth_input = self._input_list + [ + {"role": "user", "content": "Synthesize ALL your findings and provide your final comprehensive answer now."} + ] + synth_result = await Runner.run(agent, synth_input, max_turns=2) + self._research_result = synth_result.final_output or "Research incomplete." + except Exception: + self._research_result = f"Docs research exceeded turn limits after {batch_num + 1} batches." + break + else: + logger.warning("Docs research error in batch %d: %s", batch_num, e) + self._research_result = f"Docs research was partially completed but encountered an error: {e}" + break + + if should_compact(self._input_list): + logger.info("Compacting docs conversation after batch %d", batch_num) + self._input_list = compact_tool_outputs(self._input_list) + + if should_compact(self._input_list): + try: + summary_agent = new_summarization_agent() + summary_result = await Runner.run(summary_agent, self._input_list, max_turns=1) + if summary_result.final_output: + self._input_list = apply_summary_to_input_list( + self._input_list, summary_result.final_output, query + ) + except Exception as se: + logger.warning("Summarization failed: %s", se) + else: + if not self._research_result: + self._research_result = "Docs research reached maximum iterations without producing a final result." + + if parent_task_id and parent_agent_name: + await ActivityHelpers.execute_activity( + activity_name=ACPActivityName.EVENT_SEND, + request=EventSendParams( + agent_name=parent_agent_name, + task_id=parent_task_id, + content=TextContent( + author="agent", + content=json.dumps({ + "event_type": "research_complete", + "source_agent": environment_variables.AGENT_NAME, + "child_task_id": params.task.id, + "result": self._research_result or "No results found.", + }), + ), + ), + response_type=Event, + start_to_close_timeout=timedelta(seconds=30), + ) + + self._complete_task = True + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> Dict[str, Any]: + logger.info("Docs research task created: %s", params) + self._task_id = params.task.id + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="Docs Research agent initialized. Send a research query to begin.", + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) + + return {"status": "complete", "result": self._research_result} + + def _extract_payload(self, params: SendEventParams) -> dict: + if params.event.content and hasattr(params.event.content, "content"): + raw_content = params.event.content.content or "" + else: + raw_content = "" + if isinstance(raw_content, dict): + return raw_content + if isinstance(raw_content, str): + try: + return json.loads(raw_content) + except json.JSONDecodeError: + return {"raw_content": raw_content} + return {"raw_content": str(raw_content)} diff --git a/examples/demos/deep_research/docs_researcher/pyproject.toml b/examples/demos/deep_research/docs_researcher/pyproject.toml new file mode 100644 index 000000000..5dc7b64a1 --- /dev/null +++ b/examples/demos/deep_research/docs_researcher/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "deep-research-docs" +version = "0.1.0" +description = "Documentation research subagent using web search and page fetching" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk>=0.6.5", + "openai-agents>=0.4.2", + "temporalio>=1.18.2", + "scale-gp", + "httpx", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/demos/deep_research/github_researcher/.dockerignore b/examples/demos/deep_research/github_researcher/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/demos/deep_research/github_researcher/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/demos/deep_research/github_researcher/Dockerfile b/examples/demos/deep_research/github_researcher/Dockerfile new file mode 100644 index 000000000..d1a5005d6 --- /dev/null +++ b/examples/demos/deep_research/github_researcher/Dockerfile @@ -0,0 +1,42 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies (includes nodejs/npm for GitHub MCP server) +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + build-essential \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY deep_research/github_researcher/pyproject.toml /app/github_researcher/pyproject.toml + +WORKDIR /app/github_researcher + +COPY deep_research/github_researcher/project /app/github_researcher/project + +RUN uv pip install --system . + +ENV PYTHONPATH=/app +ENV AGENT_NAME=deep-research-github + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/demos/deep_research/github_researcher/environments.yaml b/examples/demos/deep_research/github_researcher/environments.yaml new file mode 100644 index 000000000..71fc1f08c --- /dev/null +++ b/examples/demos/deep_research/github_researcher/environments.yaml @@ -0,0 +1,27 @@ +# Agent Environment Configuration +schema_version: "v1" +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/demos/deep_research/github_researcher/manifest.yaml b/examples/demos/deep_research/github_researcher/manifest.yaml new file mode 100644 index 000000000..b3b499d04 --- /dev/null +++ b/examples/demos/deep_research/github_researcher/manifest.yaml @@ -0,0 +1,59 @@ +# Agent Manifest Configuration + +build: + context: + root: ../../ + include_paths: + - deep_research/github_researcher + dockerfile: deep_research/github_researcher/Dockerfile + dockerignore: deep_research/github_researcher/.dockerignore + +local_development: + agent: + port: 8011 + host_address: host.docker.internal + + paths: + acp: project/acp.py + worker: project/run_worker.py + +agent: + acp_type: async + name: deep-research-github + description: Searches GitHub repositories for code, issues, and PRs + + temporal: + enabled: true + workflows: + - name: deep-research-github + queue_name: deep_research_github_queue + + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + + env: + OPENAI_ORG_ID: "" + +deployment: + image: + repository: "" + tag: "latest" + + imagePullSecrets: + - name: my-registry-secret + + global: + agent: + name: "deep-research-github" + description: "Searches GitHub repositories for code, issues, and PRs" + + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/demos/deep_research/github_researcher/project/__init__.py b/examples/demos/deep_research/github_researcher/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/demos/deep_research/github_researcher/project/acp.py b/examples/demos/deep_research/github_researcher/project/acp.py new file mode 100644 index 000000000..2b017f344 --- /dev/null +++ b/examples/demos/deep_research/github_researcher/project/acp.py @@ -0,0 +1,29 @@ +import os + +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.types.fastacp import TemporalACPConfig + +context_interceptor = ContextInterceptor() +streaming_model_provider = TemporalStreamingModelProvider() + +# Create the ACP server +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)], + interceptors=[context_interceptor], + ), +) + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(acp, host="0.0.0.0", port=8000) diff --git a/examples/demos/deep_research/github_researcher/project/run_worker.py b/examples/demos/deep_research/github_researcher/project/run_worker.py new file mode 100644 index 000000000..f19090ed3 --- /dev/null +++ b/examples/demos/deep_research/github_researcher/project/run_worker.py @@ -0,0 +1,66 @@ +import asyncio +import os + +from agents.mcp import MCPServerStdio +from datetime import timedelta +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters, StatelessMCPServerProvider + +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables + +from project.workflow import GitHubResearchWorkflow + +environment_variables = EnvironmentVariables.refresh() + +logger = make_logger(__name__) + + +async def main(): + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + github_token = os.environ.get("GITHUB_PERSONAL_ACCESS_TOKEN", "") + + github_server = StatelessMCPServerProvider( + name="GitHubServer", + server_factory=lambda: MCPServerStdio( + name="GitHubServer", + params={ + "command": "npx", + "args": ["-y", "@modelcontextprotocol/server-github"], + "env": {**os.environ, "GITHUB_PERSONAL_ACCESS_TOKEN": github_token}, + }, + ), + ) + + all_activities = get_all_activities() + + context_interceptor = ContextInterceptor() + + model_params = ModelActivityParameters( + start_to_close_timeout=timedelta(minutes=10), + ) + + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[ + OpenAIAgentsPlugin( + model_params=model_params, + mcp_server_providers=[github_server], + ), + ], + interceptors=[context_interceptor], + ) + + await worker.run( + activities=all_activities, + workflow=GitHubResearchWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/demos/deep_research/github_researcher/project/summarization.py b/examples/demos/deep_research/github_researcher/project/summarization.py new file mode 100644 index 000000000..a7215e595 --- /dev/null +++ b/examples/demos/deep_research/github_researcher/project/summarization.py @@ -0,0 +1,129 @@ +""" +Conversation compaction for research agents. + +Prevents Temporal payload size limit (~2MB) from being exceeded by compacting +old tool outputs between batch iterations of Runner.run(). +""" +from __future__ import annotations + +import json +from typing import Any, Dict, List, Optional + +from agents import Agent + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + +# Trigger compaction when serialized conversation exceeds this size (bytes). +# Temporal payload limit is ~2MB; we compact well before that. +COMPACTION_BYTE_THRESHOLD = 800_000 # 800 KB + +# Always keep the last N tool outputs in full (most recent context for the model). +KEEP_RECENT_OUTPUTS = 3 + +# Stub text that replaces truncated tool outputs. +TRUNCATED_STUB = "[Previous tool output truncated. Key findings were incorporated into the assistant's analysis.]" + + +def estimate_payload_size(input_list: List[Dict[str, Any]]) -> int: + """Estimate the serialized byte size of the conversation.""" + try: + return len(json.dumps(input_list, default=str)) + except Exception: + return sum(len(str(item)) for item in input_list) + + +def should_compact(input_list: List[Dict[str, Any]]) -> bool: + """Check if the conversation payload exceeds the compaction threshold.""" + size = estimate_payload_size(input_list) + logger.info("Conversation payload size: %d bytes", size) + return size > COMPACTION_BYTE_THRESHOLD + + +def compact_tool_outputs(input_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Replace old tool outputs with short stubs to reduce payload size. + + Keeps the most recent KEEP_RECENT_OUTPUTS tool outputs in full. + Older outputs are replaced with a truncation stub. + + Works with both Responses API format (function_call_output) and + Chat Completions format (role=tool). + """ + # Find indices of tool output items + output_indices = [] + for i, item in enumerate(input_list): + if not isinstance(item, dict): + continue + # Responses API format + if item.get("type") == "function_call_output": + output_indices.append(i) + # Chat Completions format + elif item.get("role") == "tool": + output_indices.append(i) + + if len(output_indices) <= KEEP_RECENT_OUTPUTS: + logger.info("Only %d tool outputs, no compaction needed", len(output_indices)) + return input_list + + # Truncate all but the most recent N outputs + indices_to_truncate = output_indices[:-KEEP_RECENT_OUTPUTS] + compacted = list(input_list) # shallow copy + + for idx in indices_to_truncate: + item = compacted[idx] + # Responses API format + if item.get("type") == "function_call_output": + output_val = item.get("output", "") + if len(str(output_val)) > 200: + compacted[idx] = {**item, "output": TRUNCATED_STUB} + # Chat Completions format + elif item.get("role") == "tool": + content_val = item.get("content", "") + if len(str(content_val)) > 200: + compacted[idx] = {**item, "content": TRUNCATED_STUB} + + before = estimate_payload_size(input_list) + after = estimate_payload_size(compacted) + logger.info("Compacted conversation: %d -> %d bytes (%d tool outputs truncated)", + before, after, len(indices_to_truncate)) + return compacted + + +def new_summarization_agent() -> Agent: + """Create a lightweight agent that summarizes research findings.""" + return Agent( + name="ResearchSummarizer", + instructions="""Summarize the research conversation concisely. Focus on: +- Key findings and code references discovered +- File paths, function names, and relevant snippets +- What questions were answered and what gaps remain +- Current state of the research + +Be comprehensive but concise (3-5 paragraphs). Focus on OUTCOMES, not listing every tool call.""", + model="gpt-4.1-mini", + tools=[], + ) + + +def find_last_summary_index(input_list: List[Dict[str, Any]]) -> Optional[int]: + """Find the index of the last summary message.""" + for i in range(len(input_list) - 1, -1, -1): + item = input_list[i] + if isinstance(item, dict) and item.get("_summary") is True: + return i + return None + + +def apply_summary_to_input_list( + input_list: List[Dict[str, Any]], + summary_text: str, + original_query: str, +) -> List[Dict[str, Any]]: + """Replace the conversation with a summary + resume instruction.""" + return [ + {"role": "user", "content": original_query}, + {"role": "assistant", "content": summary_text, "_summary": True}, + {"role": "user", "content": "Use the above summary of your previous research to continue. If you have enough information, provide your final synthesis. Otherwise, continue searching.", "_synthetic": True}, + ] diff --git a/examples/demos/deep_research/github_researcher/project/workflow.py b/examples/demos/deep_research/github_researcher/project/workflow.py new file mode 100644 index 000000000..5b0628185 --- /dev/null +++ b/examples/demos/deep_research/github_researcher/project/workflow.py @@ -0,0 +1,256 @@ +from __future__ import annotations + +import json +from datetime import timedelta +from typing import Any, Dict, List + +from agents import Agent, Runner, ModelSettings +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.contrib import openai_agents +from temporalio.workflow import ActivityConfig + +from agentex.lib import adk +from agentex.lib.types.acp import CreateTaskParams, SendEventParams +from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers +from agentex.lib.core.temporal.activities.adk.acp.acp_activities import ( + ACPActivityName, + EventSendParams, +) +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks +from agentex.lib.utils.logging import make_logger +from agentex.types.event import Event +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from project.summarization import ( + should_compact, + compact_tool_outputs, + new_summarization_agent, + apply_summary_to_input_list, +) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") + +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +# Update this prompt to target your own repositories +SYSTEM_PROMPT = """You are a GitHub research specialist. Search GitHub repositories to find relevant code, documentation, issues, and pull requests. + +You have access to the GitHub MCP server which lets you search code, read files, list issues, and explore repositories. + +TARGET REPOSITORIES: +- scale-agentex - The AgentEx platform source code +- scale-agentex-python - The AgentEx Python SDK + +RULES: +1. Make ONE tool call at a time - never call multiple tools in parallel +2. ONLY use search_code - the search snippets contain enough context +3. NEVER use get_file_contents - it returns too much data and will crash the workflow +4. Keep search queries simple and short (e.g. "BaseWorkflow repo:scaleapi/scale-agentex") +5. After 4-5 searches, produce your final answer as plain text (no tool call) + +GUIDELINES: +- Search both repos when relevant - answers often span platform and SDK +- Avoid special characters or complex boolean syntax in queries +- If a search returns nothing, try alternative terms +- If you see a summary of previous research, build on it rather than repeating searches + +OUTPUT FORMAT - When done, write your findings with a **Sources** section at the end: + +**Findings:** +[Your analysis and explanations here] + +**Sources:** +For every piece of information, cite the source using this format: +- `repo_name/path/to/file.py` (lines X-Y) - brief description of what was found +- `repo_name#issue_number` - brief description if from an issue/PR""" + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class GitHubResearchWorkflow(BaseWorkflow): + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._research_result: str | None = None + self._task_id: str | None = None + self._trace_id: str | None = None + self._parent_span_id: str | None = None + self._input_list: List[Dict[str, Any]] = [] + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + logger.info("GitHub researcher received event: %s", params) + self._task_id = params.task.id + self._trace_id = params.task.id + self._parent_span_id = params.task.id + + payload = self._extract_payload(params) + + parent_task_id = None + parent_agent_name = None + if params.task and getattr(params.task, "params", None): + parent_task_id = params.task.params.get("source_task_id") + parent_agent_name = params.task.params.get("parent_agent_name") + if not parent_task_id: + parent_task_id = payload.get("source_task_id") + if not parent_agent_name: + parent_agent_name = payload.get("parent_agent_name") + + # Write messages to the parent task so everything appears in one conversation + message_task_id = parent_task_id or params.task.id + + query = payload.get("query", payload.get("raw_content", "")) + if not query: + await adk.messages.create( + task_id=message_task_id, + content=TextContent(author="agent", content="No research query provided."), + ) + self._complete_task = True + return + + await adk.messages.create( + task_id=message_task_id, + content=TextContent(author="agent", content=f"Starting GitHub research for: {query}"), + ) + + self._input_list.append({"role": "user", "content": query}) + + # Reference MCP server by name (registered on the worker via StatelessMCPServerProvider) + github_server = openai_agents.workflow.stateless_mcp_server( + "GitHubServer", + config=ActivityConfig( + start_to_close_timeout=timedelta(minutes=10), + retry_policy=RetryPolicy( + maximum_attempts=3, + initial_interval=timedelta(seconds=60), + backoff_coefficient=2.0, + ), + ), + ) + + agent = Agent( + name="GitHubResearcher", + instructions=SYSTEM_PROMPT, + model="gpt-4.1-mini", + mcp_servers=[github_server], + model_settings=ModelSettings(parallel_tool_calls=False), + ) + + hooks = TemporalStreamingHooks(task_id=message_task_id, timeout=timedelta(minutes=2)) + + # Run in batches to prevent Temporal payload size limit (~2MB) from being hit. + # Between batches, compact old tool outputs so the conversation stays small. + TURNS_PER_BATCH = 7 + MAX_BATCHES = 5 + + for batch_num in range(MAX_BATCHES): + try: + result = await Runner.run(agent, self._input_list, hooks=hooks, max_turns=TURNS_PER_BATCH) + self._input_list = result.to_input_list() + + if result.final_output: + self._research_result = result.final_output + break + except Exception as e: + error_msg = str(e) + if "Max turns" in error_msg: + logger.warning("Batch %d hit max turns, attempting synthesis", batch_num) + try: + synth_input = self._input_list + [ + {"role": "user", "content": "You've done enough research. Synthesize ALL your findings and provide your final comprehensive answer now."} + ] + synth_result = await Runner.run(agent, synth_input, max_turns=2) + self._research_result = synth_result.final_output or "Research incomplete." + except Exception: + self._research_result = f"GitHub research exceeded turn limits after {batch_num + 1} batches." + break + else: + logger.warning("GitHub research error in batch %d: %s", batch_num, e) + self._research_result = f"GitHub research was partially completed but encountered an error: {e}" + break + + # Compact conversation between batches to prevent payload growth + if should_compact(self._input_list): + logger.info("Compacting conversation after batch %d", batch_num) + self._input_list = compact_tool_outputs(self._input_list) + + # If still too large after truncation, use summarization agent + if should_compact(self._input_list): + logger.info("Still too large after truncation, running summarization agent") + try: + summary_agent = new_summarization_agent() + summary_result = await Runner.run(summary_agent, self._input_list, max_turns=1) + if summary_result.final_output: + self._input_list = apply_summary_to_input_list( + self._input_list, summary_result.final_output, query + ) + except Exception as se: + logger.warning("Summarization failed: %s", se) + else: + # Exhausted all batches without final output + if not self._research_result: + self._research_result = "GitHub research reached maximum iterations without producing a final result." + + # Send completion event back to parent orchestrator + if parent_task_id and parent_agent_name: + await ActivityHelpers.execute_activity( + activity_name=ACPActivityName.EVENT_SEND, + request=EventSendParams( + agent_name=parent_agent_name, + task_id=parent_task_id, + content=TextContent( + author="agent", + content=json.dumps({ + "event_type": "research_complete", + "source_agent": environment_variables.AGENT_NAME, + "child_task_id": params.task.id, + "result": self._research_result or "No results found.", + }), + ), + ), + response_type=Event, + start_to_close_timeout=timedelta(seconds=30), + ) + + self._complete_task = True + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> Dict[str, Any]: + logger.info("GitHub research task created: %s", params) + self._task_id = params.task.id + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="GitHub Research agent initialized. Send a research query to begin.", + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) + + return {"status": "complete", "result": self._research_result} + + def _extract_payload(self, params: SendEventParams) -> dict: + if params.event.content and hasattr(params.event.content, "content"): + raw_content = params.event.content.content or "" + else: + raw_content = "" + if isinstance(raw_content, dict): + return raw_content + if isinstance(raw_content, str): + try: + return json.loads(raw_content) + except json.JSONDecodeError: + return {"raw_content": raw_content} + return {"raw_content": str(raw_content)} diff --git a/examples/demos/deep_research/github_researcher/pyproject.toml b/examples/demos/deep_research/github_researcher/pyproject.toml new file mode 100644 index 000000000..6d14961d3 --- /dev/null +++ b/examples/demos/deep_research/github_researcher/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "deep-research-github" +version = "0.1.0" +description = "GitHub research subagent using MCP server" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk>=0.6.5", + "openai-agents>=0.4.2", + "temporalio>=1.18.2", + "scale-gp", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/demos/deep_research/orchestrator/.dockerignore b/examples/demos/deep_research/orchestrator/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/demos/deep_research/orchestrator/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/demos/deep_research/orchestrator/Dockerfile b/examples/demos/deep_research/orchestrator/Dockerfile new file mode 100644 index 000000000..0a7602944 --- /dev/null +++ b/examples/demos/deep_research/orchestrator/Dockerfile @@ -0,0 +1,40 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + build-essential \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY deep_research/orchestrator/pyproject.toml /app/orchestrator/pyproject.toml + +WORKDIR /app/orchestrator + +COPY deep_research/orchestrator/project /app/orchestrator/project + +RUN uv pip install --system . + +ENV PYTHONPATH=/app +ENV AGENT_NAME=deep-research-orchestrator + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/demos/deep_research/orchestrator/environments.yaml b/examples/demos/deep_research/orchestrator/environments.yaml new file mode 100644 index 000000000..71fc1f08c --- /dev/null +++ b/examples/demos/deep_research/orchestrator/environments.yaml @@ -0,0 +1,27 @@ +# Agent Environment Configuration +schema_version: "v1" +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/demos/deep_research/orchestrator/manifest.yaml b/examples/demos/deep_research/orchestrator/manifest.yaml new file mode 100644 index 000000000..b3727e20d --- /dev/null +++ b/examples/demos/deep_research/orchestrator/manifest.yaml @@ -0,0 +1,64 @@ +# Agent Manifest Configuration +# --------------------------- +# This file defines how your agent should be built and deployed. + +build: + context: + root: ../../ + include_paths: + - deep_research/orchestrator + dockerfile: deep_research/orchestrator/Dockerfile + dockerignore: deep_research/orchestrator/.dockerignore + +local_development: + agent: + port: 8010 + host_address: host.docker.internal + + paths: + acp: project/acp.py + worker: project/run_worker.py + +agent: + acp_type: async + name: deep-research-orchestrator + description: Orchestrates deep research by dispatching GitHub, Docs, and Slack subagents + + temporal: + enabled: true + workflows: + - name: deep-research-orchestrator + queue_name: deep_research_orchestrator_queue + + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + # - env_var_name: OPENAI_API_KEY + # secret_name: openai-api-key + # secret_key: api-key + + env: + OPENAI_ORG_ID: "" + +deployment: + image: + repository: "" + tag: "latest" + + imagePullSecrets: + - name: my-registry-secret + + global: + agent: + name: "deep-research-orchestrator" + description: "Orchestrates deep research by dispatching GitHub, Docs, and Slack subagents" + + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/demos/deep_research/orchestrator/project/__init__.py b/examples/demos/deep_research/orchestrator/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/demos/deep_research/orchestrator/project/acp.py b/examples/demos/deep_research/orchestrator/project/acp.py new file mode 100644 index 000000000..2b017f344 --- /dev/null +++ b/examples/demos/deep_research/orchestrator/project/acp.py @@ -0,0 +1,29 @@ +import os + +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.types.fastacp import TemporalACPConfig + +context_interceptor = ContextInterceptor() +streaming_model_provider = TemporalStreamingModelProvider() + +# Create the ACP server +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)], + interceptors=[context_interceptor], + ), +) + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(acp, host="0.0.0.0", port=8000) diff --git a/examples/demos/deep_research/orchestrator/project/prompts.py b/examples/demos/deep_research/orchestrator/project/prompts.py new file mode 100644 index 000000000..770d79376 --- /dev/null +++ b/examples/demos/deep_research/orchestrator/project/prompts.py @@ -0,0 +1,30 @@ +ORCHESTRATOR_SYSTEM_PROMPT = """You are the lead research coordinator. Dispatch specialized research agents and synthesize their findings. + +You have 3 research agents: + +1. **GitHub Researcher** (dispatch_github_researcher) - Searches scale-agentex (platform) and scale-agentex-python (SDK) repos for code, issues, PRs +2. **Docs Researcher** (dispatch_docs_researcher) - Searches official AgentEx docs and DeepWiki AI-generated docs +3. **Slack Researcher** (dispatch_slack_researcher) - Searches configured Slack channels for team discussions + +STRATEGY: + +1. **Plan**: Analyze the question and decide which agents to dispatch (2-3 agents) +2. **Dispatch in parallel**: Call multiple dispatch tools simultaneously. Give each agent a SPECIFIC, TARGETED query - not just the user's raw question. +3. **Evaluate**: After receiving results, assess completeness. Are there gaps? Contradictions? Need more detail? +4. **Iterate if needed**: Dispatch again with refined queries if results are incomplete or you need to cross-check findings. +5. **Synthesize**: Once you have comprehensive results, produce your final answer. + +QUERY TIPS: +- For GitHub: mention specific class names, function names, or patterns to search for +- For Docs: mention specific topics, concepts, or doc page names +- For Slack: mention specific keywords or topics people would discuss + +OUTPUT FORMAT: After receiving all results, write a comprehensive answer with: + +1. **Answer** - Clear, direct answer to the question +2. **Details** - Key findings organized by topic, with code examples where relevant +3. **Sources** - IMPORTANT: Preserve ALL source citations from the research agents. Organize them by type: + - **Code:** `repo/path/file.py` (lines X-Y) - description + - **Docs:** [Page title](URL) - description + - **Slack:** #channel, @user, date - description +4. **Gaps** - Any areas where information was limited or unclear""" diff --git a/examples/demos/deep_research/orchestrator/project/run_worker.py b/examples/demos/deep_research/orchestrator/project/run_worker.py new file mode 100644 index 000000000..bbadf5e8f --- /dev/null +++ b/examples/demos/deep_research/orchestrator/project/run_worker.py @@ -0,0 +1,55 @@ +import asyncio + +from datetime import timedelta +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters + +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker +from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables + +from project.workflow import ResearchOrchestratorWorkflow + +environment_variables = EnvironmentVariables.refresh() + +logger = make_logger(__name__) + + +async def main(): + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + all_activities = get_all_activities() + [stream_lifecycle_content] + + context_interceptor = ContextInterceptor() + streaming_model_provider = TemporalStreamingModelProvider() + + model_params = ModelActivityParameters( + start_to_close_timeout=timedelta(minutes=10), + ) + + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[ + OpenAIAgentsPlugin( + model_params=model_params, + model_provider=streaming_model_provider, + ), + ], + interceptors=[context_interceptor], + ) + + await worker.run( + activities=all_activities, + workflow=ResearchOrchestratorWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/demos/deep_research/orchestrator/project/workflow.py b/examples/demos/deep_research/orchestrator/project/workflow.py new file mode 100644 index 000000000..0f07ed97f --- /dev/null +++ b/examples/demos/deep_research/orchestrator/project/workflow.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import asyncio +import json +from datetime import timedelta +from typing import Any, Dict, List + +from agents import Agent, Runner, ModelSettings, function_tool +from openai.types.shared import Reasoning +from temporalio import workflow + +from agentex.lib import adk +from agentex.lib.types.acp import CreateTaskParams, SendEventParams +from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers +from agentex.lib.core.temporal.activities.adk.acp.acp_activities import ( + ACPActivityName, + EventSendParams, +) +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks +from agentex.lib.utils.logging import make_logger +from agentex.types.event import Event +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables + +from project.prompts import ORCHESTRATOR_SYSTEM_PROMPT + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") + +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + +# Update these to match your subagent names from their manifest.yaml files +GITHUB_AGENT_NAME = "deep-research-github" +DOCS_AGENT_NAME = "deep-research-docs" +SLACK_AGENT_NAME = "deep-research-slack" + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class ResearchOrchestratorWorkflow(BaseWorkflow): + """Orchestrates deep research by dispatching GitHub, Docs, and Slack subagents.""" + + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._research_result: str | None = None + self._task_id: str | None = None + self._trace_id: str | None = None + self._parent_span_id: str | None = None + self._agent_id: str | None = None + self._input_list: List[Dict[str, Any]] = [] + # Stores results from subagents keyed by child_task_id + self._subagent_results: Dict[str, str] = {} + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + logger.info("Orchestrator received event: %s", params) + + if self._task_id is None: + self._task_id = params.task.id + if self._trace_id is None: + self._trace_id = params.task.id + if self._parent_span_id is None: + self._parent_span_id = params.task.id + if self._agent_id is None and getattr(params, "agent", None): + self._agent_id = params.agent.id + + payload = self._extract_payload(params) + + # Check if this is a research_complete event from a subagent + event_type = payload.get("event_type") + if event_type == "research_complete": + child_task_id = payload.get("child_task_id", "") + result = payload.get("result", "") + source = payload.get("source_agent", "unknown") + self._subagent_results[child_task_id] = result + logger.info("Received research result from %s (child: %s)", source, child_task_id) + return + + # Otherwise, this is a user query + query = payload.get("query", payload.get("raw_content", "")) + if not query: + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="No research query provided. Please send a question.", + ), + ) + return + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Starting deep research for: {query}\n\n" + "I'll dispatch specialized research agents to search GitHub, documentation, and Slack." + ), + ), + ) + + self._input_list.append({"role": "user", "content": query}) + + # Create dispatch tools that operate within this workflow's context + dispatch_github = self._make_dispatch_tool(GITHUB_AGENT_NAME, "dispatch_github_researcher", + "Dispatch the GitHub research agent to search across GitHub repos. " + "Returns comprehensive findings from code, issues, and PRs.") + + dispatch_docs = self._make_dispatch_tool(DOCS_AGENT_NAME, "dispatch_docs_researcher", + "Dispatch the Docs research agent to search documentation. " + "Returns findings from documentation sources.") + + dispatch_slack = self._make_dispatch_tool(SLACK_AGENT_NAME, "dispatch_slack_researcher", + "Dispatch the Slack research agent to search Slack channels. " + "Returns findings from team discussions.") + + agent = Agent( + name="ResearchOrchestrator", + instructions=ORCHESTRATOR_SYSTEM_PROMPT, + model="gpt-5.1", + tools=[dispatch_github, dispatch_docs, dispatch_slack], + model_settings=ModelSettings( + reasoning=Reasoning(effort="high", summary="auto"), + ), + ) + + hooks = TemporalStreamingHooks(task_id=params.task.id, timeout=timedelta(minutes=2)) + result = await Runner.run(agent, self._input_list, hooks=hooks, max_turns=50) + + self._research_result = result.final_output + self._input_list = result.to_input_list() + self._complete_task = True + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> Dict[str, Any]: + logger.info("Research orchestrator task created: %s", params) + self._task_id = params.task.id + self._agent_id = params.agent.id + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + "Deep Research Orchestrator initialized.\n\n" + "Send me a question and I'll coordinate research across " + "GitHub repos, official documentation, and Slack discussions to give you " + "a comprehensive answer." + ), + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) + + return {"status": "complete", "result": self._research_result} + + def _make_dispatch_tool(self, agent_name: str, tool_name: str, description: str): + """Create a @function_tool that dispatches to a subagent via ACP and waits for results.""" + workflow_instance = self + + @function_tool(name_override=tool_name, description_override=description) + async def dispatch(query: str) -> str: + """ + Args: + query: The specific research query for this subagent. Be specific about + what to search for - include repo names, class/function names, + doc page names, or channel names when possible. + """ + logger.info("Dispatching %s with query: %s", agent_name, query) + + # Create a child task via ACP activity. + # Pass source_task_id so the subagent can write messages to our task. + task = await adk.acp.create_task( + name=f"{agent_name}-{workflow.uuid4()}", + agent_name=agent_name, + params={ + "source_task_id": workflow_instance._task_id, + "parent_agent_name": environment_variables.AGENT_NAME, + }, + ) + child_task_id = task.id + logger.info("Created child task %s for %s", child_task_id, agent_name) + + # Send the query as an event to the child agent + await ActivityHelpers.execute_activity( + activity_name=ACPActivityName.EVENT_SEND, + request=EventSendParams( + agent_name=agent_name, + task_id=child_task_id, + content=TextContent( + author="user", + content=json.dumps({"query": query}), + ), + ), + response_type=Event, + start_to_close_timeout=timedelta(seconds=30), + ) + + # Wait for the subagent to send back a completion event + # The signal handler stores results in _subagent_results by child_task_id + try: + await workflow.wait_condition( + lambda: child_task_id in workflow_instance._subagent_results, + timeout=timedelta(minutes=10), + ) + except asyncio.TimeoutError: + return f"Research from {agent_name} timed out after 10 minutes." + + result = workflow_instance._subagent_results.get(child_task_id, "No result received.") + logger.info("Got result from %s (length: %d)", agent_name, len(result)) + return result + + return dispatch + + def _extract_payload(self, params: SendEventParams) -> dict: + if params.event.content and hasattr(params.event.content, "content"): + raw_content = params.event.content.content or "" + else: + raw_content = "" + if isinstance(raw_content, dict): + return raw_content + if isinstance(raw_content, str): + try: + return json.loads(raw_content) + except json.JSONDecodeError: + return {"raw_content": raw_content} + return {"raw_content": str(raw_content)} diff --git a/examples/demos/deep_research/orchestrator/pyproject.toml b/examples/demos/deep_research/orchestrator/pyproject.toml new file mode 100644 index 000000000..417b0bcac --- /dev/null +++ b/examples/demos/deep_research/orchestrator/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "deep-research-orchestrator" +version = "0.1.0" +description = "Orchestrates deep research by dispatching specialized subagents" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk>=0.6.5", + "openai-agents>=0.4.2", + "temporalio>=1.18.2", + "scale-gp", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/demos/deep_research/slack_researcher/.dockerignore b/examples/demos/deep_research/slack_researcher/.dockerignore new file mode 100644 index 000000000..c49489471 --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/.dockerignore @@ -0,0 +1,43 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Environments +.env** +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git +.gitignore + +# Misc +.DS_Store diff --git a/examples/demos/deep_research/slack_researcher/Dockerfile b/examples/demos/deep_research/slack_researcher/Dockerfile new file mode 100644 index 000000000..be8e2bb9e --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/Dockerfile @@ -0,0 +1,42 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies (includes nodejs/npm for Slack MCP server) +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + build-essential \ + gcc \ + cmake \ + netcat-openbsd \ + nodejs \ + npm \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/** + +# Install tctl (Temporal CLI) +RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \ + tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \ + chmod +x /usr/local/bin/tctl && \ + rm /tmp/tctl.tar.gz + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY deep_research/slack_researcher/pyproject.toml /app/slack_researcher/pyproject.toml + +WORKDIR /app/slack_researcher + +COPY deep_research/slack_researcher/project /app/slack_researcher/project + +RUN uv pip install --system . + +ENV PYTHONPATH=/app +ENV AGENT_NAME=deep-research-slack + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/demos/deep_research/slack_researcher/environments.yaml b/examples/demos/deep_research/slack_researcher/environments.yaml new file mode 100644 index 000000000..71fc1f08c --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/environments.yaml @@ -0,0 +1,27 @@ +# Agent Environment Configuration +schema_version: "v1" +environments: + dev: + auth: + principal: + user_id: # TODO: Fill in + account_id: # TODO: Fill in + helm_overrides: + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" + temporal-worker: + enabled: true + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/demos/deep_research/slack_researcher/manifest.yaml b/examples/demos/deep_research/slack_researcher/manifest.yaml new file mode 100644 index 000000000..e305cf387 --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/manifest.yaml @@ -0,0 +1,59 @@ +# Agent Manifest Configuration + +build: + context: + root: ../../ + include_paths: + - deep_research/slack_researcher + dockerfile: deep_research/slack_researcher/Dockerfile + dockerignore: deep_research/slack_researcher/.dockerignore + +local_development: + agent: + port: 8013 + host_address: host.docker.internal + + paths: + acp: project/acp.py + worker: project/run_worker.py + +agent: + acp_type: async + name: deep-research-slack + description: Searches Slack channels for relevant discussions and context + + temporal: + enabled: true + workflows: + - name: deep-research-slack + queue_name: deep_research_slack_queue + + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + + env: + OPENAI_ORG_ID: "" + +deployment: + image: + repository: "" + tag: "latest" + + imagePullSecrets: + - name: my-registry-secret + + global: + agent: + name: "deep-research-slack" + description: "Searches Slack channels for relevant discussions and context" + + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/demos/deep_research/slack_researcher/project/__init__.py b/examples/demos/deep_research/slack_researcher/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/demos/deep_research/slack_researcher/project/acp.py b/examples/demos/deep_research/slack_researcher/project/acp.py new file mode 100644 index 000000000..2b017f344 --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/project/acp.py @@ -0,0 +1,29 @@ +import os + +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin + +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.types.fastacp import TemporalACPConfig + +context_interceptor = ContextInterceptor() +streaming_model_provider = TemporalStreamingModelProvider() + +# Create the ACP server +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)], + interceptors=[context_interceptor], + ), +) + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(acp, host="0.0.0.0", port=8000) diff --git a/examples/demos/deep_research/slack_researcher/project/run_worker.py b/examples/demos/deep_research/slack_researcher/project/run_worker.py new file mode 100644 index 000000000..e06fffb03 --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/project/run_worker.py @@ -0,0 +1,71 @@ +import asyncio +import os + +from agents.mcp import MCPServerStdio +from datetime import timedelta +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters, StatelessMCPServerProvider + +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables + +from project.workflow import SlackResearchWorkflow + +environment_variables = EnvironmentVariables.refresh() + +logger = make_logger(__name__) + + +async def main(): + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + slack_token = os.environ.get("SLACK_BOT_TOKEN", "") + slack_team_id = os.environ.get("SLACK_TEAM_ID", "") + + slack_env = {**os.environ, "SLACK_BOT_TOKEN": slack_token} + if slack_team_id: + slack_env["SLACK_TEAM_ID"] = slack_team_id + + slack_server = StatelessMCPServerProvider( + name="SlackServer", + server_factory=lambda: MCPServerStdio( + name="SlackServer", + params={ + "command": "npx", + "args": ["-y", "@modelcontextprotocol/server-slack"], + "env": slack_env, + }, + ), + ) + + all_activities = get_all_activities() + + context_interceptor = ContextInterceptor() + + model_params = ModelActivityParameters( + start_to_close_timeout=timedelta(minutes=10), + ) + + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[ + OpenAIAgentsPlugin( + model_params=model_params, + mcp_server_providers=[slack_server], + ), + ], + interceptors=[context_interceptor], + ) + + await worker.run( + activities=all_activities, + workflow=SlackResearchWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/demos/deep_research/slack_researcher/project/summarization.py b/examples/demos/deep_research/slack_researcher/project/summarization.py new file mode 100644 index 000000000..a7215e595 --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/project/summarization.py @@ -0,0 +1,129 @@ +""" +Conversation compaction for research agents. + +Prevents Temporal payload size limit (~2MB) from being exceeded by compacting +old tool outputs between batch iterations of Runner.run(). +""" +from __future__ import annotations + +import json +from typing import Any, Dict, List, Optional + +from agents import Agent + +from agentex.lib.utils.logging import make_logger + +logger = make_logger(__name__) + +# Trigger compaction when serialized conversation exceeds this size (bytes). +# Temporal payload limit is ~2MB; we compact well before that. +COMPACTION_BYTE_THRESHOLD = 800_000 # 800 KB + +# Always keep the last N tool outputs in full (most recent context for the model). +KEEP_RECENT_OUTPUTS = 3 + +# Stub text that replaces truncated tool outputs. +TRUNCATED_STUB = "[Previous tool output truncated. Key findings were incorporated into the assistant's analysis.]" + + +def estimate_payload_size(input_list: List[Dict[str, Any]]) -> int: + """Estimate the serialized byte size of the conversation.""" + try: + return len(json.dumps(input_list, default=str)) + except Exception: + return sum(len(str(item)) for item in input_list) + + +def should_compact(input_list: List[Dict[str, Any]]) -> bool: + """Check if the conversation payload exceeds the compaction threshold.""" + size = estimate_payload_size(input_list) + logger.info("Conversation payload size: %d bytes", size) + return size > COMPACTION_BYTE_THRESHOLD + + +def compact_tool_outputs(input_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Replace old tool outputs with short stubs to reduce payload size. + + Keeps the most recent KEEP_RECENT_OUTPUTS tool outputs in full. + Older outputs are replaced with a truncation stub. + + Works with both Responses API format (function_call_output) and + Chat Completions format (role=tool). + """ + # Find indices of tool output items + output_indices = [] + for i, item in enumerate(input_list): + if not isinstance(item, dict): + continue + # Responses API format + if item.get("type") == "function_call_output": + output_indices.append(i) + # Chat Completions format + elif item.get("role") == "tool": + output_indices.append(i) + + if len(output_indices) <= KEEP_RECENT_OUTPUTS: + logger.info("Only %d tool outputs, no compaction needed", len(output_indices)) + return input_list + + # Truncate all but the most recent N outputs + indices_to_truncate = output_indices[:-KEEP_RECENT_OUTPUTS] + compacted = list(input_list) # shallow copy + + for idx in indices_to_truncate: + item = compacted[idx] + # Responses API format + if item.get("type") == "function_call_output": + output_val = item.get("output", "") + if len(str(output_val)) > 200: + compacted[idx] = {**item, "output": TRUNCATED_STUB} + # Chat Completions format + elif item.get("role") == "tool": + content_val = item.get("content", "") + if len(str(content_val)) > 200: + compacted[idx] = {**item, "content": TRUNCATED_STUB} + + before = estimate_payload_size(input_list) + after = estimate_payload_size(compacted) + logger.info("Compacted conversation: %d -> %d bytes (%d tool outputs truncated)", + before, after, len(indices_to_truncate)) + return compacted + + +def new_summarization_agent() -> Agent: + """Create a lightweight agent that summarizes research findings.""" + return Agent( + name="ResearchSummarizer", + instructions="""Summarize the research conversation concisely. Focus on: +- Key findings and code references discovered +- File paths, function names, and relevant snippets +- What questions were answered and what gaps remain +- Current state of the research + +Be comprehensive but concise (3-5 paragraphs). Focus on OUTCOMES, not listing every tool call.""", + model="gpt-4.1-mini", + tools=[], + ) + + +def find_last_summary_index(input_list: List[Dict[str, Any]]) -> Optional[int]: + """Find the index of the last summary message.""" + for i in range(len(input_list) - 1, -1, -1): + item = input_list[i] + if isinstance(item, dict) and item.get("_summary") is True: + return i + return None + + +def apply_summary_to_input_list( + input_list: List[Dict[str, Any]], + summary_text: str, + original_query: str, +) -> List[Dict[str, Any]]: + """Replace the conversation with a summary + resume instruction.""" + return [ + {"role": "user", "content": original_query}, + {"role": "assistant", "content": summary_text, "_summary": True}, + {"role": "user", "content": "Use the above summary of your previous research to continue. If you have enough information, provide your final synthesis. Otherwise, continue searching.", "_synthetic": True}, + ] diff --git a/examples/demos/deep_research/slack_researcher/project/workflow.py b/examples/demos/deep_research/slack_researcher/project/workflow.py new file mode 100644 index 000000000..01314b615 --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/project/workflow.py @@ -0,0 +1,247 @@ +from __future__ import annotations + +import json +from datetime import timedelta +from typing import Any, Dict, List + +from agents import Agent, Runner +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.contrib import openai_agents +from temporalio.workflow import ActivityConfig + +from agentex.lib import adk +from agentex.lib.types.acp import CreateTaskParams, SendEventParams +from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers +from agentex.lib.core.temporal.activities.adk.acp.acp_activities import ( + ACPActivityName, + EventSendParams, +) +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks +from agentex.lib.utils.logging import make_logger +from agentex.types.event import Event +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from project.summarization import ( + should_compact, + compact_tool_outputs, + new_summarization_agent, + apply_summary_to_input_list, +) + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") + +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +# Update this prompt with your own Slack channels and channel IDs +SYSTEM_PROMPT = """You are a Slack research specialist. Search Slack for relevant discussions, threads, and context. + +You have access to the Slack MCP server which lets you search messages, read channels, and explore threads. + +KEY SLACK CHANNELS: +- #your-main-channel (Channel ID: YOUR_CHANNEL_ID) - Main discussion channel +- #your-help-channel (Channel ID: YOUR_CHANNEL_ID) - Help and support questions +- #your-team-channel (Channel ID: YOUR_CHANNEL_ID) - Team discussions + +IMPORTANT: Use channel IDs (e.g. "C0123456789") not channel names when calling tools. + +RULES: +1. Use slack_search_messages for searching - NEVER fetch full channel history (too large, will time out) +2. After searching, read 2-3 of the most relevant threads for full context +3. After 5-6 tool calls, produce your final answer as plain text (no tool call) + +GUIDELINES: +- Search with different keyword variations to find relevant discussions +- Note who said what and when for attribution +- If you see a summary of previous research, build on it rather than repeating searches + +OUTPUT FORMAT - When done, write your findings with a **Sources** section at the end: + +**Findings:** +[Your analysis of discussions and conclusions] + +**Sources:** +For every piece of information, cite the Slack source: +- #channel-name, @username, date - brief description of what was discussed""" + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class SlackResearchWorkflow(BaseWorkflow): + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._research_result: str | None = None + self._task_id: str | None = None + self._trace_id: str | None = None + self._parent_span_id: str | None = None + self._input_list: List[Dict[str, Any]] = [] + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + logger.info("Slack researcher received event: %s", params) + self._task_id = params.task.id + self._trace_id = params.task.id + self._parent_span_id = params.task.id + + payload = self._extract_payload(params) + + parent_task_id = None + parent_agent_name = None + if params.task and getattr(params.task, "params", None): + parent_task_id = params.task.params.get("source_task_id") + parent_agent_name = params.task.params.get("parent_agent_name") + if not parent_task_id: + parent_task_id = payload.get("source_task_id") + if not parent_agent_name: + parent_agent_name = payload.get("parent_agent_name") + + # Write messages to the parent task so everything appears in one conversation + message_task_id = parent_task_id or params.task.id + + query = payload.get("query", payload.get("raw_content", "")) + if not query: + await adk.messages.create( + task_id=message_task_id, + content=TextContent(author="agent", content="No research query provided."), + ) + self._complete_task = True + return + + await adk.messages.create( + task_id=message_task_id, + content=TextContent(author="agent", content=f"Starting Slack research for: {query}"), + ) + + self._input_list.append({"role": "user", "content": query}) + + # Reference MCP server by name (registered on the worker) + slack_server = openai_agents.workflow.stateless_mcp_server( + "SlackServer", + config=ActivityConfig( + start_to_close_timeout=timedelta(minutes=10), + retry_policy=RetryPolicy( + maximum_attempts=3, + initial_interval=timedelta(seconds=10), + backoff_coefficient=2.0, + ), + ), + ) + + agent = Agent( + name="SlackResearcher", + instructions=SYSTEM_PROMPT, + model="gpt-4.1-mini", + mcp_servers=[slack_server], + ) + + hooks = TemporalStreamingHooks(task_id=message_task_id, timeout=timedelta(minutes=2)) + + TURNS_PER_BATCH = 7 + MAX_BATCHES = 5 + + for batch_num in range(MAX_BATCHES): + try: + result = await Runner.run(agent, self._input_list, hooks=hooks, max_turns=TURNS_PER_BATCH) + self._input_list = result.to_input_list() + + if result.final_output: + self._research_result = result.final_output + break + except Exception as e: + error_msg = str(e) + if "Max turns" in error_msg: + logger.warning("Slack batch %d hit max turns, attempting synthesis", batch_num) + try: + synth_input = self._input_list + [ + {"role": "user", "content": "Synthesize ALL your findings and provide your final comprehensive answer now."} + ] + synth_result = await Runner.run(agent, synth_input, max_turns=2) + self._research_result = synth_result.final_output or "Research incomplete." + except Exception: + self._research_result = f"Slack research exceeded turn limits after {batch_num + 1} batches." + break + else: + logger.warning("Slack research error in batch %d: %s", batch_num, e) + self._research_result = f"Slack research was partially completed but encountered an error: {e}" + break + + if should_compact(self._input_list): + logger.info("Compacting slack conversation after batch %d", batch_num) + self._input_list = compact_tool_outputs(self._input_list) + + if should_compact(self._input_list): + try: + summary_agent = new_summarization_agent() + summary_result = await Runner.run(summary_agent, self._input_list, max_turns=1) + if summary_result.final_output: + self._input_list = apply_summary_to_input_list( + self._input_list, summary_result.final_output, query + ) + except Exception as se: + logger.warning("Summarization failed: %s", se) + else: + if not self._research_result: + self._research_result = "Slack research reached maximum iterations without producing a final result." + + if parent_task_id and parent_agent_name: + await ActivityHelpers.execute_activity( + activity_name=ACPActivityName.EVENT_SEND, + request=EventSendParams( + agent_name=parent_agent_name, + task_id=parent_task_id, + content=TextContent( + author="agent", + content=json.dumps({ + "event_type": "research_complete", + "source_agent": environment_variables.AGENT_NAME, + "child_task_id": params.task.id, + "result": self._research_result or "No results found.", + }), + ), + ), + response_type=Event, + start_to_close_timeout=timedelta(seconds=30), + ) + + self._complete_task = True + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> Dict[str, Any]: + logger.info("Slack research task created: %s", params) + self._task_id = params.task.id + + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content="Slack Research agent initialized. Send a research query to begin.", + ), + ) + + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) + + return {"status": "complete", "result": self._research_result} + + def _extract_payload(self, params: SendEventParams) -> dict: + if params.event.content and hasattr(params.event.content, "content"): + raw_content = params.event.content.content or "" + else: + raw_content = "" + if isinstance(raw_content, dict): + return raw_content + if isinstance(raw_content, str): + try: + return json.loads(raw_content) + except json.JSONDecodeError: + return {"raw_content": raw_content} + return {"raw_content": str(raw_content)} diff --git a/examples/demos/deep_research/slack_researcher/pyproject.toml b/examples/demos/deep_research/slack_researcher/pyproject.toml new file mode 100644 index 000000000..5b9c93d40 --- /dev/null +++ b/examples/demos/deep_research/slack_researcher/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "deep-research-slack" +version = "0.1.0" +description = "Slack research subagent using MCP server" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk>=0.6.5", + "openai-agents>=0.4.2", + "temporalio>=1.18.2", + "scale-gp", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88