diff --git a/examples/demos/deep_research/README.md b/examples/demos/deep_research/README.md
new file mode 100644
index 000000000..5412bfc07
--- /dev/null
+++ b/examples/demos/deep_research/README.md
@@ -0,0 +1,143 @@
+# Deep Research Multi-Agent System
+
+A multi-agent research system built on AgentEx that demonstrates **orchestrator + subagent communication** using Temporal workflows. An orchestrator agent dispatches specialized research subagents (GitHub, Docs, Slack) in parallel, collects their findings, and synthesizes a comprehensive answer.
+
+## Architecture
+
+```
+ ┌─────────────────────┐
+ │ Orchestrator │
+ User ────▶│ (GPT-5.1) │
+ Query │ Dispatches & │
+ │ Synthesizes │
+ └───┬─────┬─────┬─────┘
+ │ │ │
+ ┌─────────┘ │ └─────────┐
+ ▼ ▼ ▼
+ ┌────────────┐ ┌────────────┐ ┌────────────┐
+ │ GitHub │ │ Docs │ │ Slack │
+ │ Researcher │ │ Researcher │ │ Researcher │
+ │ (GPT-4.1 │ │ (GPT-4.1 │ │ (GPT-4.1 │
+ │ mini) │ │ mini) │ │ mini) │
+ │ │ │ │ │ │
+ │ GitHub MCP│ │ Web Search│ │ Slack MCP │
+ │ Server │ │ + Fetcher │ │ Server │
+ └────────────┘ └────────────┘ └────────────┘
+```
+
+## Key Patterns Demonstrated
+
+### 1. Multi-Agent Orchestration via ACP
+The orchestrator creates child tasks on subagents using `adk.acp.create_task()`, sends queries via `EVENT_SEND`, and waits for `research_complete` callback events.
+
+### 2. Shared Task ID for Unified Output
+All subagents write messages to the **orchestrator's task ID** (passed as `source_task_id`), so the user sees all research progress in a single conversation thread.
+
+### 3. Conversation Compaction
+Subagents use a batched `Runner.run()` pattern with conversation compaction between batches to stay within Temporal's ~2MB payload limit during long research sessions.
+
+### 4. MCP Server Integration
+GitHub and Slack subagents use MCP (Model Context Protocol) servers via `StatelessMCPServerProvider` for tool access.
+
+## Agents
+
+| Agent | Port | Model | Tools |
+|-------|------|-------|-------|
+| Orchestrator | 8010 | gpt-5.1 | dispatch_github, dispatch_docs, dispatch_slack |
+| GitHub Researcher | 8011 | gpt-4.1-mini | GitHub MCP (search_code, etc.) |
+| Docs Researcher | 8012 | gpt-4.1-mini | web_search (Tavily), fetch_docs_page |
+| Slack Researcher | 8013 | gpt-4.1-mini | Slack MCP (search_messages, etc.) |
+
+## Prerequisites
+
+- [AgentEx CLI](https://agentex.sgp.scale.com/docs/) installed
+- OpenAI API key
+- GitHub Personal Access Token (for GitHub researcher)
+- Tavily API key (for Docs researcher) - get one at https://tavily.com
+- Slack Bot Token (for Slack researcher)
+
+## Setup
+
+### 1. Environment Variables
+
+Create a `.env` file in each agent directory with the required keys:
+
+**orchestrator/.env:**
+```
+OPENAI_API_KEY=your-openai-key
+```
+
+**github_researcher/.env:**
+```
+OPENAI_API_KEY=your-openai-key
+GITHUB_PERSONAL_ACCESS_TOKEN=your-github-token
+```
+
+**docs_researcher/.env:**
+```
+OPENAI_API_KEY=your-openai-key
+TAVILY_API_KEY=your-tavily-key
+```
+
+**slack_researcher/.env:**
+```
+OPENAI_API_KEY=your-openai-key
+SLACK_BOT_TOKEN=your-slack-bot-token
+SLACK_TEAM_ID=your-slack-team-id
+```
+
+### 2. Run All Agents
+
+Start each agent in a separate terminal:
+
+```bash
+# Terminal 1 - Orchestrator
+cd orchestrator
+agentex agents run --manifest manifest.yaml
+
+# Terminal 2 - GitHub Researcher
+cd github_researcher
+agentex agents run --manifest manifest.yaml
+
+# Terminal 3 - Docs Researcher
+cd docs_researcher
+agentex agents run --manifest manifest.yaml
+
+# Terminal 4 - Slack Researcher
+cd slack_researcher
+agentex agents run --manifest manifest.yaml
+```
+
+### 3. Test
+
+Open the AgentEx UI and send a research question to the orchestrator agent. You should see:
+1. The orchestrator dispatching queries to subagents
+2. Each subagent streaming its research progress to the same conversation
+3. The orchestrator synthesizing all findings into a final answer
+
+## Customization
+
+### Using Different Research Sources
+
+You can adapt the subagents to search different sources:
+- Replace the GitHub MCP server with any other MCP server
+- Replace Tavily with your preferred search API
+- Replace the Slack MCP with any communication platform's MCP
+- Update the system prompts to match your target repositories, docs, and channels
+
+### Adding More Subagents
+
+To add a new research subagent:
+1. Copy one of the existing subagent directories
+2. Update the manifest.yaml with a new agent name and port
+3. Modify the workflow.py system prompt and tools
+4. Add a new dispatch tool in the orchestrator's workflow.py
+
+## How Shared Task ID Works
+
+The key pattern that makes all agents write to the same conversation:
+
+1. **Orchestrator** passes its `task_id` as `source_task_id` when creating child tasks
+2. **Subagents** extract `parent_task_id = params.task.params.get("source_task_id")`
+3. **Subagents** use `message_task_id = parent_task_id or params.task.id` for all `adk.messages.create()` calls and `TemporalStreamingHooks`
+4. This means all messages and streamed LLM output appear in the orchestrator's task conversation
diff --git a/examples/demos/deep_research/docs_researcher/.dockerignore b/examples/demos/deep_research/docs_researcher/.dockerignore
new file mode 100644
index 000000000..c49489471
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/.dockerignore
@@ -0,0 +1,43 @@
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# Environments
+.env**
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# IDE
+.idea/
+.vscode/
+*.swp
+*.swo
+
+# Git
+.git
+.gitignore
+
+# Misc
+.DS_Store
diff --git a/examples/demos/deep_research/docs_researcher/Dockerfile b/examples/demos/deep_research/docs_researcher/Dockerfile
new file mode 100644
index 000000000..98bbba32a
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/Dockerfile
@@ -0,0 +1,40 @@
+# syntax=docker/dockerfile:1.3
+FROM python:3.12-slim
+COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/
+
+# Install system dependencies
+RUN apt-get update && apt-get install -y \
+ htop \
+ vim \
+ curl \
+ tar \
+ python3-dev \
+ build-essential \
+ gcc \
+ cmake \
+ netcat-openbsd \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/**
+
+# Install tctl (Temporal CLI)
+RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \
+ tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \
+ chmod +x /usr/local/bin/tctl && \
+ rm /tmp/tctl.tar.gz
+
+RUN uv pip install --system --upgrade pip setuptools wheel
+
+ENV UV_HTTP_TIMEOUT=1000
+
+COPY deep_research/docs_researcher/pyproject.toml /app/docs_researcher/pyproject.toml
+
+WORKDIR /app/docs_researcher
+
+COPY deep_research/docs_researcher/project /app/docs_researcher/project
+
+RUN uv pip install --system .
+
+ENV PYTHONPATH=/app
+ENV AGENT_NAME=deep-research-docs
+
+CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]
diff --git a/examples/demos/deep_research/docs_researcher/environments.yaml b/examples/demos/deep_research/docs_researcher/environments.yaml
new file mode 100644
index 000000000..71fc1f08c
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/environments.yaml
@@ -0,0 +1,27 @@
+# Agent Environment Configuration
+schema_version: "v1"
+environments:
+ dev:
+ auth:
+ principal:
+ user_id: # TODO: Fill in
+ account_id: # TODO: Fill in
+ helm_overrides:
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
+ temporal-worker:
+ enabled: true
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
diff --git a/examples/demos/deep_research/docs_researcher/manifest.yaml b/examples/demos/deep_research/docs_researcher/manifest.yaml
new file mode 100644
index 000000000..5ed500e7d
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/manifest.yaml
@@ -0,0 +1,59 @@
+# Agent Manifest Configuration
+
+build:
+ context:
+ root: ../../
+ include_paths:
+ - deep_research/docs_researcher
+ dockerfile: deep_research/docs_researcher/Dockerfile
+ dockerignore: deep_research/docs_researcher/.dockerignore
+
+local_development:
+ agent:
+ port: 8012
+ host_address: host.docker.internal
+
+ paths:
+ acp: project/acp.py
+ worker: project/run_worker.py
+
+agent:
+ acp_type: async
+ name: deep-research-docs
+ description: Searches documentation and the web for relevant guides and references
+
+ temporal:
+ enabled: true
+ workflows:
+ - name: deep-research-docs
+ queue_name: deep_research_docs_queue
+
+ credentials:
+ - env_var_name: REDIS_URL
+ secret_name: redis-url-secret
+ secret_key: url
+
+ env:
+ OPENAI_ORG_ID: ""
+
+deployment:
+ image:
+ repository: ""
+ tag: "latest"
+
+ imagePullSecrets:
+ - name: my-registry-secret
+
+ global:
+ agent:
+ name: "deep-research-docs"
+ description: "Searches documentation and the web for relevant guides and references"
+
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
diff --git a/examples/demos/deep_research/docs_researcher/project/__init__.py b/examples/demos/deep_research/docs_researcher/project/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/demos/deep_research/docs_researcher/project/acp.py b/examples/demos/deep_research/docs_researcher/project/acp.py
new file mode 100644
index 000000000..2b017f344
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/project/acp.py
@@ -0,0 +1,29 @@
+import os
+
+from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
+
+from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
+ TemporalStreamingModelProvider,
+)
+from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
+from agentex.lib.sdk.fastacp.fastacp import FastACP
+from agentex.lib.types.fastacp import TemporalACPConfig
+
+context_interceptor = ContextInterceptor()
+streaming_model_provider = TemporalStreamingModelProvider()
+
+# Create the ACP server
+acp = FastACP.create(
+ acp_type="async",
+ config=TemporalACPConfig(
+ type="temporal",
+ temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
+ plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)],
+ interceptors=[context_interceptor],
+ ),
+)
+
+if __name__ == "__main__":
+ import uvicorn
+
+ uvicorn.run(acp, host="0.0.0.0", port=8000)
diff --git a/examples/demos/deep_research/docs_researcher/project/activities.py b/examples/demos/deep_research/docs_researcher/project/activities.py
new file mode 100644
index 000000000..035a8eb12
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/project/activities.py
@@ -0,0 +1,110 @@
+"""Activity-based tools for docs research: web search and docs page fetcher."""
+import os
+import re
+
+from temporalio import activity
+import httpx
+
+from agentex.lib.utils.logging import make_logger
+
+logger = make_logger(__name__)
+
+
+def _html_to_text(html: str) -> str:
+ """Basic HTML to text conversion."""
+ text = re.sub(r"", "", html, flags=re.DOTALL)
+ text = re.sub(r"", "", text, flags=re.DOTALL)
+ text = re.sub(r"<[^>]+>", "\n", text)
+ text = re.sub(r" ", " ", text)
+ text = re.sub(r"&", "&", text)
+ text = re.sub(r"<", "<", text)
+ text = re.sub(r">", ">", text)
+ text = re.sub(r"'", "'", text)
+ text = re.sub(r""", '"', text)
+ text = re.sub(r"\n{3,}", "\n\n", text)
+ return text.strip()
+
+
+@activity.defn
+async def web_search(query: str) -> str:
+ """Search the web using Tavily and return results from multiple sources.
+
+ Args:
+ query: The search query to look up on the web.
+ """
+ api_key = os.environ.get("TAVILY_API_KEY", "")
+ if not api_key:
+ return "Error: TAVILY_API_KEY not set"
+
+ try:
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.post(
+ "https://api.tavily.com/search",
+ json={
+ "api_key": api_key,
+ "query": query,
+ "search_depth": "advanced",
+ "include_answer": True,
+ "max_results": 5,
+ },
+ )
+ if response.status_code != 200:
+ return f"Search API error: {response.status_code}"
+
+ data = response.json()
+ answer = data.get("answer", "")
+
+ results = []
+ for r in data.get("results", [])[:5]:
+ title = r.get("title", "")
+ content = r.get("content", "")
+ url = r.get("url", "")
+ results.append(f"**{title}**\n{content}\nSource: {url}")
+
+ output = f"Search results for '{query}':\n\n"
+ if answer:
+ output += f"**Summary:** {answer}\n\n---\n\n"
+ output += "\n\n---\n\n".join(results)
+ return output
+
+ except Exception as e:
+ return f"Error searching: {str(e)}"
+
+
+@activity.defn
+async def fetch_docs_page(url: str) -> str:
+ """Fetch and parse a documentation page.
+
+ Args:
+ url: The full URL of the documentation page to fetch. Use one of:
+ https://agentex.sgp.scale.com/docs/... (official docs),
+ https://deepwiki.com/scaleapi/scale-agentex/... (platform DeepWiki),
+ https://deepwiki.com/scaleapi/scale-agentex-python/... (SDK DeepWiki)
+ """
+ url = url.strip()
+
+ # Update these allowed prefixes to match your documentation sources
+ allowed_prefixes = [
+ "https://agentex.sgp.scale.com/",
+ "https://deepwiki.com/scaleapi/scale-agentex",
+ ]
+ if not any(url.startswith(prefix) for prefix in allowed_prefixes):
+ return f"Error: URL must be from an allowed documentation source. Got: {url}"
+
+ try:
+ async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
+ response = await client.get(url)
+ if response.status_code != 200:
+ return f"Error fetching {url}: HTTP {response.status_code}"
+
+ content_type = response.headers.get("content-type", "")
+ if "json" in content_type:
+ return response.text[:10000]
+
+ text = _html_to_text(response.text)
+ if len(text) > 15000:
+ text = text[:15000] + "\n\n[... truncated, page is very long ...]"
+ return f"Content from {url}:\n\n{text}"
+
+ except Exception as e:
+ return f"Error fetching {url}: {str(e)}"
diff --git a/examples/demos/deep_research/docs_researcher/project/run_worker.py b/examples/demos/deep_research/docs_researcher/project/run_worker.py
new file mode 100644
index 000000000..b610bd8a5
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/project/run_worker.py
@@ -0,0 +1,50 @@
+import asyncio
+
+from datetime import timedelta
+from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
+
+from agentex.lib.core.temporal.activities import get_all_activities
+from agentex.lib.core.temporal.workers.worker import AgentexWorker
+from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
+from agentex.lib.utils.logging import make_logger
+from agentex.lib.environment_variables import EnvironmentVariables
+
+from project.workflow import DocsResearchWorkflow
+from project.activities import web_search, fetch_docs_page
+
+environment_variables = EnvironmentVariables.refresh()
+
+logger = make_logger(__name__)
+
+
+async def main():
+ task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
+ if task_queue_name is None:
+ raise ValueError("WORKFLOW_TASK_QUEUE is not set")
+
+ all_activities = get_all_activities() + [web_search, fetch_docs_page]
+
+ context_interceptor = ContextInterceptor()
+
+ model_params = ModelActivityParameters(
+ start_to_close_timeout=timedelta(minutes=10),
+ )
+
+ worker = AgentexWorker(
+ task_queue=task_queue_name,
+ plugins=[
+ OpenAIAgentsPlugin(
+ model_params=model_params,
+ ),
+ ],
+ interceptors=[context_interceptor],
+ )
+
+ await worker.run(
+ activities=all_activities,
+ workflow=DocsResearchWorkflow,
+ )
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/demos/deep_research/docs_researcher/project/summarization.py b/examples/demos/deep_research/docs_researcher/project/summarization.py
new file mode 100644
index 000000000..a7215e595
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/project/summarization.py
@@ -0,0 +1,129 @@
+"""
+Conversation compaction for research agents.
+
+Prevents Temporal payload size limit (~2MB) from being exceeded by compacting
+old tool outputs between batch iterations of Runner.run().
+"""
+from __future__ import annotations
+
+import json
+from typing import Any, Dict, List, Optional
+
+from agents import Agent
+
+from agentex.lib.utils.logging import make_logger
+
+logger = make_logger(__name__)
+
+# Trigger compaction when serialized conversation exceeds this size (bytes).
+# Temporal payload limit is ~2MB; we compact well before that.
+COMPACTION_BYTE_THRESHOLD = 800_000 # 800 KB
+
+# Always keep the last N tool outputs in full (most recent context for the model).
+KEEP_RECENT_OUTPUTS = 3
+
+# Stub text that replaces truncated tool outputs.
+TRUNCATED_STUB = "[Previous tool output truncated. Key findings were incorporated into the assistant's analysis.]"
+
+
+def estimate_payload_size(input_list: List[Dict[str, Any]]) -> int:
+ """Estimate the serialized byte size of the conversation."""
+ try:
+ return len(json.dumps(input_list, default=str))
+ except Exception:
+ return sum(len(str(item)) for item in input_list)
+
+
+def should_compact(input_list: List[Dict[str, Any]]) -> bool:
+ """Check if the conversation payload exceeds the compaction threshold."""
+ size = estimate_payload_size(input_list)
+ logger.info("Conversation payload size: %d bytes", size)
+ return size > COMPACTION_BYTE_THRESHOLD
+
+
+def compact_tool_outputs(input_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """
+ Replace old tool outputs with short stubs to reduce payload size.
+
+ Keeps the most recent KEEP_RECENT_OUTPUTS tool outputs in full.
+ Older outputs are replaced with a truncation stub.
+
+ Works with both Responses API format (function_call_output) and
+ Chat Completions format (role=tool).
+ """
+ # Find indices of tool output items
+ output_indices = []
+ for i, item in enumerate(input_list):
+ if not isinstance(item, dict):
+ continue
+ # Responses API format
+ if item.get("type") == "function_call_output":
+ output_indices.append(i)
+ # Chat Completions format
+ elif item.get("role") == "tool":
+ output_indices.append(i)
+
+ if len(output_indices) <= KEEP_RECENT_OUTPUTS:
+ logger.info("Only %d tool outputs, no compaction needed", len(output_indices))
+ return input_list
+
+ # Truncate all but the most recent N outputs
+ indices_to_truncate = output_indices[:-KEEP_RECENT_OUTPUTS]
+ compacted = list(input_list) # shallow copy
+
+ for idx in indices_to_truncate:
+ item = compacted[idx]
+ # Responses API format
+ if item.get("type") == "function_call_output":
+ output_val = item.get("output", "")
+ if len(str(output_val)) > 200:
+ compacted[idx] = {**item, "output": TRUNCATED_STUB}
+ # Chat Completions format
+ elif item.get("role") == "tool":
+ content_val = item.get("content", "")
+ if len(str(content_val)) > 200:
+ compacted[idx] = {**item, "content": TRUNCATED_STUB}
+
+ before = estimate_payload_size(input_list)
+ after = estimate_payload_size(compacted)
+ logger.info("Compacted conversation: %d -> %d bytes (%d tool outputs truncated)",
+ before, after, len(indices_to_truncate))
+ return compacted
+
+
+def new_summarization_agent() -> Agent:
+ """Create a lightweight agent that summarizes research findings."""
+ return Agent(
+ name="ResearchSummarizer",
+ instructions="""Summarize the research conversation concisely. Focus on:
+- Key findings and code references discovered
+- File paths, function names, and relevant snippets
+- What questions were answered and what gaps remain
+- Current state of the research
+
+Be comprehensive but concise (3-5 paragraphs). Focus on OUTCOMES, not listing every tool call.""",
+ model="gpt-4.1-mini",
+ tools=[],
+ )
+
+
+def find_last_summary_index(input_list: List[Dict[str, Any]]) -> Optional[int]:
+ """Find the index of the last summary message."""
+ for i in range(len(input_list) - 1, -1, -1):
+ item = input_list[i]
+ if isinstance(item, dict) and item.get("_summary") is True:
+ return i
+ return None
+
+
+def apply_summary_to_input_list(
+ input_list: List[Dict[str, Any]],
+ summary_text: str,
+ original_query: str,
+) -> List[Dict[str, Any]]:
+ """Replace the conversation with a summary + resume instruction."""
+ return [
+ {"role": "user", "content": original_query},
+ {"role": "assistant", "content": summary_text, "_summary": True},
+ {"role": "user", "content": "Use the above summary of your previous research to continue. If you have enough information, provide your final synthesis. Otherwise, continue searching.", "_synthetic": True},
+ ]
diff --git a/examples/demos/deep_research/docs_researcher/project/workflow.py b/examples/demos/deep_research/docs_researcher/project/workflow.py
new file mode 100644
index 000000000..964271056
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/project/workflow.py
@@ -0,0 +1,240 @@
+from __future__ import annotations
+
+import json
+from datetime import timedelta
+from typing import Any, Dict, List
+
+from agents import Agent, Runner
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.contrib import openai_agents
+
+from agentex.lib import adk
+from agentex.lib.types.acp import CreateTaskParams, SendEventParams
+from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers
+from agentex.lib.core.temporal.activities.adk.acp.acp_activities import (
+ ACPActivityName,
+ EventSendParams,
+)
+from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow
+from agentex.lib.core.temporal.types.workflow import SignalName
+from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks
+from agentex.lib.utils.logging import make_logger
+from agentex.types.event import Event
+from agentex.types.text_content import TextContent
+from agentex.lib.environment_variables import EnvironmentVariables
+from project.summarization import (
+ should_compact,
+ compact_tool_outputs,
+ new_summarization_agent,
+ apply_summary_to_input_list,
+)
+
+from project.activities import web_search, fetch_docs_page
+
+environment_variables = EnvironmentVariables.refresh()
+
+if environment_variables.WORKFLOW_NAME is None:
+ raise ValueError("Environment variable WORKFLOW_NAME is not set")
+
+if environment_variables.AGENT_NAME is None:
+ raise ValueError("Environment variable AGENT_NAME is not set")
+
+logger = make_logger(__name__)
+
+TOOL_TIMEOUT = timedelta(minutes=10)
+
+# Update these URLs to match your own documentation sources
+SYSTEM_PROMPT = """You are a documentation research specialist. Search official documentation, DeepWiki pages, and the web to find relevant guides, API references, and explanations.
+
+You have two tools:
+1. web_search - Search the web using Tavily for broad queries
+2. fetch_docs_page - Fetch and read specific documentation pages
+
+KEY DOCUMENTATION SOURCES:
+- Official AgentEx Docs: https://agentex.sgp.scale.com/docs/
+- DeepWiki (Platform): https://deepwiki.com/scaleapi/scale-agentex
+- DeepWiki (Python SDK): https://deepwiki.com/scaleapi/scale-agentex-python
+
+RULES:
+1. Start with web_search to find relevant doc pages
+2. Fetch the most relevant pages with fetch_docs_page
+3. After 5-6 tool calls, produce your final answer as plain text (no tool call)
+
+GUIDELINES:
+- Check both official docs AND DeepWiki - they complement each other
+- Track URLs for citations
+- If you see a summary of previous research, build on it rather than repeating searches
+
+OUTPUT FORMAT - When done, write your findings with a **Sources** section at the end:
+
+**Findings:**
+[Your analysis organized by topic]
+
+**Sources:**
+For every piece of information, cite the source URL:
+- [Page title](https://full-url) - brief description of what was found"""
+
+
+@workflow.defn(name=environment_variables.WORKFLOW_NAME)
+class DocsResearchWorkflow(BaseWorkflow):
+ def __init__(self) -> None:
+ super().__init__(display_name=environment_variables.AGENT_NAME)
+ self._complete_task = False
+ self._research_result: str | None = None
+ self._task_id: str | None = None
+ self._trace_id: str | None = None
+ self._parent_span_id: str | None = None
+ self._input_list: List[Dict[str, Any]] = []
+
+ @workflow.signal(name=SignalName.RECEIVE_EVENT)
+ async def on_task_event_send(self, params: SendEventParams) -> None:
+ logger.info("Docs researcher received event: %s", params)
+ self._task_id = params.task.id
+ self._trace_id = params.task.id
+ self._parent_span_id = params.task.id
+
+ payload = self._extract_payload(params)
+
+ parent_task_id = None
+ parent_agent_name = None
+ if params.task and getattr(params.task, "params", None):
+ parent_task_id = params.task.params.get("source_task_id")
+ parent_agent_name = params.task.params.get("parent_agent_name")
+ if not parent_task_id:
+ parent_task_id = payload.get("source_task_id")
+ if not parent_agent_name:
+ parent_agent_name = payload.get("parent_agent_name")
+
+ # Write messages to the parent task so everything appears in one conversation
+ message_task_id = parent_task_id or params.task.id
+
+ query = payload.get("query", payload.get("raw_content", ""))
+ if not query:
+ await adk.messages.create(
+ task_id=message_task_id,
+ content=TextContent(author="agent", content="No research query provided."),
+ )
+ self._complete_task = True
+ return
+
+ await adk.messages.create(
+ task_id=message_task_id,
+ content=TextContent(author="agent", content=f"Starting documentation research for: {query}"),
+ )
+
+ self._input_list.append({"role": "user", "content": query})
+
+ # Activity-based tools for web search and docs fetching
+ agent = Agent(
+ name="DocsResearcher",
+ instructions=SYSTEM_PROMPT,
+ model="gpt-4.1-mini",
+ tools=[
+ openai_agents.workflow.activity_as_tool(web_search, start_to_close_timeout=TOOL_TIMEOUT, retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=10), backoff_coefficient=2.0)),
+ openai_agents.workflow.activity_as_tool(fetch_docs_page, start_to_close_timeout=TOOL_TIMEOUT, retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=10), backoff_coefficient=2.0)),
+ ],
+ )
+
+ hooks = TemporalStreamingHooks(task_id=message_task_id, timeout=timedelta(minutes=2))
+
+ TURNS_PER_BATCH = 7
+ MAX_BATCHES = 5
+
+ for batch_num in range(MAX_BATCHES):
+ try:
+ result = await Runner.run(agent, self._input_list, hooks=hooks, max_turns=TURNS_PER_BATCH)
+ self._input_list = result.to_input_list()
+
+ if result.final_output:
+ self._research_result = result.final_output
+ break
+ except Exception as e:
+ error_msg = str(e)
+ if "Max turns" in error_msg:
+ logger.warning("Docs batch %d hit max turns, attempting synthesis", batch_num)
+ try:
+ synth_input = self._input_list + [
+ {"role": "user", "content": "Synthesize ALL your findings and provide your final comprehensive answer now."}
+ ]
+ synth_result = await Runner.run(agent, synth_input, max_turns=2)
+ self._research_result = synth_result.final_output or "Research incomplete."
+ except Exception:
+ self._research_result = f"Docs research exceeded turn limits after {batch_num + 1} batches."
+ break
+ else:
+ logger.warning("Docs research error in batch %d: %s", batch_num, e)
+ self._research_result = f"Docs research was partially completed but encountered an error: {e}"
+ break
+
+ if should_compact(self._input_list):
+ logger.info("Compacting docs conversation after batch %d", batch_num)
+ self._input_list = compact_tool_outputs(self._input_list)
+
+ if should_compact(self._input_list):
+ try:
+ summary_agent = new_summarization_agent()
+ summary_result = await Runner.run(summary_agent, self._input_list, max_turns=1)
+ if summary_result.final_output:
+ self._input_list = apply_summary_to_input_list(
+ self._input_list, summary_result.final_output, query
+ )
+ except Exception as se:
+ logger.warning("Summarization failed: %s", se)
+ else:
+ if not self._research_result:
+ self._research_result = "Docs research reached maximum iterations without producing a final result."
+
+ if parent_task_id and parent_agent_name:
+ await ActivityHelpers.execute_activity(
+ activity_name=ACPActivityName.EVENT_SEND,
+ request=EventSendParams(
+ agent_name=parent_agent_name,
+ task_id=parent_task_id,
+ content=TextContent(
+ author="agent",
+ content=json.dumps({
+ "event_type": "research_complete",
+ "source_agent": environment_variables.AGENT_NAME,
+ "child_task_id": params.task.id,
+ "result": self._research_result or "No results found.",
+ }),
+ ),
+ ),
+ response_type=Event,
+ start_to_close_timeout=timedelta(seconds=30),
+ )
+
+ self._complete_task = True
+
+ @workflow.run
+ async def on_task_create(self, params: CreateTaskParams) -> Dict[str, Any]:
+ logger.info("Docs research task created: %s", params)
+ self._task_id = params.task.id
+
+ await adk.messages.create(
+ task_id=params.task.id,
+ content=TextContent(
+ author="agent",
+ content="Docs Research agent initialized. Send a research query to begin.",
+ ),
+ )
+
+ await workflow.wait_condition(lambda: self._complete_task, timeout=None)
+ await workflow.wait_condition(lambda: workflow.all_handlers_finished())
+
+ return {"status": "complete", "result": self._research_result}
+
+ def _extract_payload(self, params: SendEventParams) -> dict:
+ if params.event.content and hasattr(params.event.content, "content"):
+ raw_content = params.event.content.content or ""
+ else:
+ raw_content = ""
+ if isinstance(raw_content, dict):
+ return raw_content
+ if isinstance(raw_content, str):
+ try:
+ return json.loads(raw_content)
+ except json.JSONDecodeError:
+ return {"raw_content": raw_content}
+ return {"raw_content": str(raw_content)}
diff --git a/examples/demos/deep_research/docs_researcher/pyproject.toml b/examples/demos/deep_research/docs_researcher/pyproject.toml
new file mode 100644
index 000000000..5dc7b64a1
--- /dev/null
+++ b/examples/demos/deep_research/docs_researcher/pyproject.toml
@@ -0,0 +1,36 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "deep-research-docs"
+version = "0.1.0"
+description = "Documentation research subagent using web search and page fetching"
+requires-python = ">=3.12"
+dependencies = [
+ "agentex-sdk>=0.6.5",
+ "openai-agents>=0.4.2",
+ "temporalio>=1.18.2",
+ "scale-gp",
+ "httpx",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest",
+ "black",
+ "isort",
+ "flake8",
+ "debugpy>=1.8.15",
+]
+
+[tool.hatch.build.targets.wheel]
+packages = ["project"]
+
+[tool.black]
+line-length = 88
+target-version = ['py312']
+
+[tool.isort]
+profile = "black"
+line_length = 88
diff --git a/examples/demos/deep_research/github_researcher/.dockerignore b/examples/demos/deep_research/github_researcher/.dockerignore
new file mode 100644
index 000000000..c49489471
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/.dockerignore
@@ -0,0 +1,43 @@
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# Environments
+.env**
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# IDE
+.idea/
+.vscode/
+*.swp
+*.swo
+
+# Git
+.git
+.gitignore
+
+# Misc
+.DS_Store
diff --git a/examples/demos/deep_research/github_researcher/Dockerfile b/examples/demos/deep_research/github_researcher/Dockerfile
new file mode 100644
index 000000000..d1a5005d6
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/Dockerfile
@@ -0,0 +1,42 @@
+# syntax=docker/dockerfile:1.3
+FROM python:3.12-slim
+COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/
+
+# Install system dependencies (includes nodejs/npm for GitHub MCP server)
+RUN apt-get update && apt-get install -y \
+ htop \
+ vim \
+ curl \
+ tar \
+ python3-dev \
+ build-essential \
+ gcc \
+ cmake \
+ netcat-openbsd \
+ nodejs \
+ npm \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/**
+
+# Install tctl (Temporal CLI)
+RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \
+ tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \
+ chmod +x /usr/local/bin/tctl && \
+ rm /tmp/tctl.tar.gz
+
+RUN uv pip install --system --upgrade pip setuptools wheel
+
+ENV UV_HTTP_TIMEOUT=1000
+
+COPY deep_research/github_researcher/pyproject.toml /app/github_researcher/pyproject.toml
+
+WORKDIR /app/github_researcher
+
+COPY deep_research/github_researcher/project /app/github_researcher/project
+
+RUN uv pip install --system .
+
+ENV PYTHONPATH=/app
+ENV AGENT_NAME=deep-research-github
+
+CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]
diff --git a/examples/demos/deep_research/github_researcher/environments.yaml b/examples/demos/deep_research/github_researcher/environments.yaml
new file mode 100644
index 000000000..71fc1f08c
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/environments.yaml
@@ -0,0 +1,27 @@
+# Agent Environment Configuration
+schema_version: "v1"
+environments:
+ dev:
+ auth:
+ principal:
+ user_id: # TODO: Fill in
+ account_id: # TODO: Fill in
+ helm_overrides:
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
+ temporal-worker:
+ enabled: true
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
diff --git a/examples/demos/deep_research/github_researcher/manifest.yaml b/examples/demos/deep_research/github_researcher/manifest.yaml
new file mode 100644
index 000000000..b3b499d04
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/manifest.yaml
@@ -0,0 +1,59 @@
+# Agent Manifest Configuration
+
+build:
+ context:
+ root: ../../
+ include_paths:
+ - deep_research/github_researcher
+ dockerfile: deep_research/github_researcher/Dockerfile
+ dockerignore: deep_research/github_researcher/.dockerignore
+
+local_development:
+ agent:
+ port: 8011
+ host_address: host.docker.internal
+
+ paths:
+ acp: project/acp.py
+ worker: project/run_worker.py
+
+agent:
+ acp_type: async
+ name: deep-research-github
+ description: Searches GitHub repositories for code, issues, and PRs
+
+ temporal:
+ enabled: true
+ workflows:
+ - name: deep-research-github
+ queue_name: deep_research_github_queue
+
+ credentials:
+ - env_var_name: REDIS_URL
+ secret_name: redis-url-secret
+ secret_key: url
+
+ env:
+ OPENAI_ORG_ID: ""
+
+deployment:
+ image:
+ repository: ""
+ tag: "latest"
+
+ imagePullSecrets:
+ - name: my-registry-secret
+
+ global:
+ agent:
+ name: "deep-research-github"
+ description: "Searches GitHub repositories for code, issues, and PRs"
+
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
diff --git a/examples/demos/deep_research/github_researcher/project/__init__.py b/examples/demos/deep_research/github_researcher/project/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/demos/deep_research/github_researcher/project/acp.py b/examples/demos/deep_research/github_researcher/project/acp.py
new file mode 100644
index 000000000..2b017f344
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/project/acp.py
@@ -0,0 +1,29 @@
+import os
+
+from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
+
+from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
+ TemporalStreamingModelProvider,
+)
+from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
+from agentex.lib.sdk.fastacp.fastacp import FastACP
+from agentex.lib.types.fastacp import TemporalACPConfig
+
+context_interceptor = ContextInterceptor()
+streaming_model_provider = TemporalStreamingModelProvider()
+
+# Create the ACP server
+acp = FastACP.create(
+ acp_type="async",
+ config=TemporalACPConfig(
+ type="temporal",
+ temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
+ plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)],
+ interceptors=[context_interceptor],
+ ),
+)
+
+if __name__ == "__main__":
+ import uvicorn
+
+ uvicorn.run(acp, host="0.0.0.0", port=8000)
diff --git a/examples/demos/deep_research/github_researcher/project/run_worker.py b/examples/demos/deep_research/github_researcher/project/run_worker.py
new file mode 100644
index 000000000..f19090ed3
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/project/run_worker.py
@@ -0,0 +1,66 @@
+import asyncio
+import os
+
+from agents.mcp import MCPServerStdio
+from datetime import timedelta
+from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters, StatelessMCPServerProvider
+
+from agentex.lib.core.temporal.activities import get_all_activities
+from agentex.lib.core.temporal.workers.worker import AgentexWorker
+from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
+from agentex.lib.utils.logging import make_logger
+from agentex.lib.environment_variables import EnvironmentVariables
+
+from project.workflow import GitHubResearchWorkflow
+
+environment_variables = EnvironmentVariables.refresh()
+
+logger = make_logger(__name__)
+
+
+async def main():
+ task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
+ if task_queue_name is None:
+ raise ValueError("WORKFLOW_TASK_QUEUE is not set")
+
+ github_token = os.environ.get("GITHUB_PERSONAL_ACCESS_TOKEN", "")
+
+ github_server = StatelessMCPServerProvider(
+ name="GitHubServer",
+ server_factory=lambda: MCPServerStdio(
+ name="GitHubServer",
+ params={
+ "command": "npx",
+ "args": ["-y", "@modelcontextprotocol/server-github"],
+ "env": {**os.environ, "GITHUB_PERSONAL_ACCESS_TOKEN": github_token},
+ },
+ ),
+ )
+
+ all_activities = get_all_activities()
+
+ context_interceptor = ContextInterceptor()
+
+ model_params = ModelActivityParameters(
+ start_to_close_timeout=timedelta(minutes=10),
+ )
+
+ worker = AgentexWorker(
+ task_queue=task_queue_name,
+ plugins=[
+ OpenAIAgentsPlugin(
+ model_params=model_params,
+ mcp_server_providers=[github_server],
+ ),
+ ],
+ interceptors=[context_interceptor],
+ )
+
+ await worker.run(
+ activities=all_activities,
+ workflow=GitHubResearchWorkflow,
+ )
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/demos/deep_research/github_researcher/project/summarization.py b/examples/demos/deep_research/github_researcher/project/summarization.py
new file mode 100644
index 000000000..a7215e595
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/project/summarization.py
@@ -0,0 +1,129 @@
+"""
+Conversation compaction for research agents.
+
+Prevents Temporal payload size limit (~2MB) from being exceeded by compacting
+old tool outputs between batch iterations of Runner.run().
+"""
+from __future__ import annotations
+
+import json
+from typing import Any, Dict, List, Optional
+
+from agents import Agent
+
+from agentex.lib.utils.logging import make_logger
+
+logger = make_logger(__name__)
+
+# Trigger compaction when serialized conversation exceeds this size (bytes).
+# Temporal payload limit is ~2MB; we compact well before that.
+COMPACTION_BYTE_THRESHOLD = 800_000 # 800 KB
+
+# Always keep the last N tool outputs in full (most recent context for the model).
+KEEP_RECENT_OUTPUTS = 3
+
+# Stub text that replaces truncated tool outputs.
+TRUNCATED_STUB = "[Previous tool output truncated. Key findings were incorporated into the assistant's analysis.]"
+
+
+def estimate_payload_size(input_list: List[Dict[str, Any]]) -> int:
+ """Estimate the serialized byte size of the conversation."""
+ try:
+ return len(json.dumps(input_list, default=str))
+ except Exception:
+ return sum(len(str(item)) for item in input_list)
+
+
+def should_compact(input_list: List[Dict[str, Any]]) -> bool:
+ """Check if the conversation payload exceeds the compaction threshold."""
+ size = estimate_payload_size(input_list)
+ logger.info("Conversation payload size: %d bytes", size)
+ return size > COMPACTION_BYTE_THRESHOLD
+
+
+def compact_tool_outputs(input_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """
+ Replace old tool outputs with short stubs to reduce payload size.
+
+ Keeps the most recent KEEP_RECENT_OUTPUTS tool outputs in full.
+ Older outputs are replaced with a truncation stub.
+
+ Works with both Responses API format (function_call_output) and
+ Chat Completions format (role=tool).
+ """
+ # Find indices of tool output items
+ output_indices = []
+ for i, item in enumerate(input_list):
+ if not isinstance(item, dict):
+ continue
+ # Responses API format
+ if item.get("type") == "function_call_output":
+ output_indices.append(i)
+ # Chat Completions format
+ elif item.get("role") == "tool":
+ output_indices.append(i)
+
+ if len(output_indices) <= KEEP_RECENT_OUTPUTS:
+ logger.info("Only %d tool outputs, no compaction needed", len(output_indices))
+ return input_list
+
+ # Truncate all but the most recent N outputs
+ indices_to_truncate = output_indices[:-KEEP_RECENT_OUTPUTS]
+ compacted = list(input_list) # shallow copy
+
+ for idx in indices_to_truncate:
+ item = compacted[idx]
+ # Responses API format
+ if item.get("type") == "function_call_output":
+ output_val = item.get("output", "")
+ if len(str(output_val)) > 200:
+ compacted[idx] = {**item, "output": TRUNCATED_STUB}
+ # Chat Completions format
+ elif item.get("role") == "tool":
+ content_val = item.get("content", "")
+ if len(str(content_val)) > 200:
+ compacted[idx] = {**item, "content": TRUNCATED_STUB}
+
+ before = estimate_payload_size(input_list)
+ after = estimate_payload_size(compacted)
+ logger.info("Compacted conversation: %d -> %d bytes (%d tool outputs truncated)",
+ before, after, len(indices_to_truncate))
+ return compacted
+
+
+def new_summarization_agent() -> Agent:
+ """Create a lightweight agent that summarizes research findings."""
+ return Agent(
+ name="ResearchSummarizer",
+ instructions="""Summarize the research conversation concisely. Focus on:
+- Key findings and code references discovered
+- File paths, function names, and relevant snippets
+- What questions were answered and what gaps remain
+- Current state of the research
+
+Be comprehensive but concise (3-5 paragraphs). Focus on OUTCOMES, not listing every tool call.""",
+ model="gpt-4.1-mini",
+ tools=[],
+ )
+
+
+def find_last_summary_index(input_list: List[Dict[str, Any]]) -> Optional[int]:
+ """Find the index of the last summary message."""
+ for i in range(len(input_list) - 1, -1, -1):
+ item = input_list[i]
+ if isinstance(item, dict) and item.get("_summary") is True:
+ return i
+ return None
+
+
+def apply_summary_to_input_list(
+ input_list: List[Dict[str, Any]],
+ summary_text: str,
+ original_query: str,
+) -> List[Dict[str, Any]]:
+ """Replace the conversation with a summary + resume instruction."""
+ return [
+ {"role": "user", "content": original_query},
+ {"role": "assistant", "content": summary_text, "_summary": True},
+ {"role": "user", "content": "Use the above summary of your previous research to continue. If you have enough information, provide your final synthesis. Otherwise, continue searching.", "_synthetic": True},
+ ]
diff --git a/examples/demos/deep_research/github_researcher/project/workflow.py b/examples/demos/deep_research/github_researcher/project/workflow.py
new file mode 100644
index 000000000..5b0628185
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/project/workflow.py
@@ -0,0 +1,256 @@
+from __future__ import annotations
+
+import json
+from datetime import timedelta
+from typing import Any, Dict, List
+
+from agents import Agent, Runner, ModelSettings
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.contrib import openai_agents
+from temporalio.workflow import ActivityConfig
+
+from agentex.lib import adk
+from agentex.lib.types.acp import CreateTaskParams, SendEventParams
+from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers
+from agentex.lib.core.temporal.activities.adk.acp.acp_activities import (
+ ACPActivityName,
+ EventSendParams,
+)
+from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow
+from agentex.lib.core.temporal.types.workflow import SignalName
+from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks
+from agentex.lib.utils.logging import make_logger
+from agentex.types.event import Event
+from agentex.types.text_content import TextContent
+from agentex.lib.environment_variables import EnvironmentVariables
+from project.summarization import (
+ should_compact,
+ compact_tool_outputs,
+ new_summarization_agent,
+ apply_summary_to_input_list,
+)
+
+environment_variables = EnvironmentVariables.refresh()
+
+if environment_variables.WORKFLOW_NAME is None:
+ raise ValueError("Environment variable WORKFLOW_NAME is not set")
+
+if environment_variables.AGENT_NAME is None:
+ raise ValueError("Environment variable AGENT_NAME is not set")
+
+logger = make_logger(__name__)
+
+
+# Update this prompt to target your own repositories
+SYSTEM_PROMPT = """You are a GitHub research specialist. Search GitHub repositories to find relevant code, documentation, issues, and pull requests.
+
+You have access to the GitHub MCP server which lets you search code, read files, list issues, and explore repositories.
+
+TARGET REPOSITORIES:
+- scale-agentex - The AgentEx platform source code
+- scale-agentex-python - The AgentEx Python SDK
+
+RULES:
+1. Make ONE tool call at a time - never call multiple tools in parallel
+2. ONLY use search_code - the search snippets contain enough context
+3. NEVER use get_file_contents - it returns too much data and will crash the workflow
+4. Keep search queries simple and short (e.g. "BaseWorkflow repo:scaleapi/scale-agentex")
+5. After 4-5 searches, produce your final answer as plain text (no tool call)
+
+GUIDELINES:
+- Search both repos when relevant - answers often span platform and SDK
+- Avoid special characters or complex boolean syntax in queries
+- If a search returns nothing, try alternative terms
+- If you see a summary of previous research, build on it rather than repeating searches
+
+OUTPUT FORMAT - When done, write your findings with a **Sources** section at the end:
+
+**Findings:**
+[Your analysis and explanations here]
+
+**Sources:**
+For every piece of information, cite the source using this format:
+- `repo_name/path/to/file.py` (lines X-Y) - brief description of what was found
+- `repo_name#issue_number` - brief description if from an issue/PR"""
+
+
+@workflow.defn(name=environment_variables.WORKFLOW_NAME)
+class GitHubResearchWorkflow(BaseWorkflow):
+ def __init__(self) -> None:
+ super().__init__(display_name=environment_variables.AGENT_NAME)
+ self._complete_task = False
+ self._research_result: str | None = None
+ self._task_id: str | None = None
+ self._trace_id: str | None = None
+ self._parent_span_id: str | None = None
+ self._input_list: List[Dict[str, Any]] = []
+
+ @workflow.signal(name=SignalName.RECEIVE_EVENT)
+ async def on_task_event_send(self, params: SendEventParams) -> None:
+ logger.info("GitHub researcher received event: %s", params)
+ self._task_id = params.task.id
+ self._trace_id = params.task.id
+ self._parent_span_id = params.task.id
+
+ payload = self._extract_payload(params)
+
+ parent_task_id = None
+ parent_agent_name = None
+ if params.task and getattr(params.task, "params", None):
+ parent_task_id = params.task.params.get("source_task_id")
+ parent_agent_name = params.task.params.get("parent_agent_name")
+ if not parent_task_id:
+ parent_task_id = payload.get("source_task_id")
+ if not parent_agent_name:
+ parent_agent_name = payload.get("parent_agent_name")
+
+ # Write messages to the parent task so everything appears in one conversation
+ message_task_id = parent_task_id or params.task.id
+
+ query = payload.get("query", payload.get("raw_content", ""))
+ if not query:
+ await adk.messages.create(
+ task_id=message_task_id,
+ content=TextContent(author="agent", content="No research query provided."),
+ )
+ self._complete_task = True
+ return
+
+ await adk.messages.create(
+ task_id=message_task_id,
+ content=TextContent(author="agent", content=f"Starting GitHub research for: {query}"),
+ )
+
+ self._input_list.append({"role": "user", "content": query})
+
+ # Reference MCP server by name (registered on the worker via StatelessMCPServerProvider)
+ github_server = openai_agents.workflow.stateless_mcp_server(
+ "GitHubServer",
+ config=ActivityConfig(
+ start_to_close_timeout=timedelta(minutes=10),
+ retry_policy=RetryPolicy(
+ maximum_attempts=3,
+ initial_interval=timedelta(seconds=60),
+ backoff_coefficient=2.0,
+ ),
+ ),
+ )
+
+ agent = Agent(
+ name="GitHubResearcher",
+ instructions=SYSTEM_PROMPT,
+ model="gpt-4.1-mini",
+ mcp_servers=[github_server],
+ model_settings=ModelSettings(parallel_tool_calls=False),
+ )
+
+ hooks = TemporalStreamingHooks(task_id=message_task_id, timeout=timedelta(minutes=2))
+
+ # Run in batches to prevent Temporal payload size limit (~2MB) from being hit.
+ # Between batches, compact old tool outputs so the conversation stays small.
+ TURNS_PER_BATCH = 7
+ MAX_BATCHES = 5
+
+ for batch_num in range(MAX_BATCHES):
+ try:
+ result = await Runner.run(agent, self._input_list, hooks=hooks, max_turns=TURNS_PER_BATCH)
+ self._input_list = result.to_input_list()
+
+ if result.final_output:
+ self._research_result = result.final_output
+ break
+ except Exception as e:
+ error_msg = str(e)
+ if "Max turns" in error_msg:
+ logger.warning("Batch %d hit max turns, attempting synthesis", batch_num)
+ try:
+ synth_input = self._input_list + [
+ {"role": "user", "content": "You've done enough research. Synthesize ALL your findings and provide your final comprehensive answer now."}
+ ]
+ synth_result = await Runner.run(agent, synth_input, max_turns=2)
+ self._research_result = synth_result.final_output or "Research incomplete."
+ except Exception:
+ self._research_result = f"GitHub research exceeded turn limits after {batch_num + 1} batches."
+ break
+ else:
+ logger.warning("GitHub research error in batch %d: %s", batch_num, e)
+ self._research_result = f"GitHub research was partially completed but encountered an error: {e}"
+ break
+
+ # Compact conversation between batches to prevent payload growth
+ if should_compact(self._input_list):
+ logger.info("Compacting conversation after batch %d", batch_num)
+ self._input_list = compact_tool_outputs(self._input_list)
+
+ # If still too large after truncation, use summarization agent
+ if should_compact(self._input_list):
+ logger.info("Still too large after truncation, running summarization agent")
+ try:
+ summary_agent = new_summarization_agent()
+ summary_result = await Runner.run(summary_agent, self._input_list, max_turns=1)
+ if summary_result.final_output:
+ self._input_list = apply_summary_to_input_list(
+ self._input_list, summary_result.final_output, query
+ )
+ except Exception as se:
+ logger.warning("Summarization failed: %s", se)
+ else:
+ # Exhausted all batches without final output
+ if not self._research_result:
+ self._research_result = "GitHub research reached maximum iterations without producing a final result."
+
+ # Send completion event back to parent orchestrator
+ if parent_task_id and parent_agent_name:
+ await ActivityHelpers.execute_activity(
+ activity_name=ACPActivityName.EVENT_SEND,
+ request=EventSendParams(
+ agent_name=parent_agent_name,
+ task_id=parent_task_id,
+ content=TextContent(
+ author="agent",
+ content=json.dumps({
+ "event_type": "research_complete",
+ "source_agent": environment_variables.AGENT_NAME,
+ "child_task_id": params.task.id,
+ "result": self._research_result or "No results found.",
+ }),
+ ),
+ ),
+ response_type=Event,
+ start_to_close_timeout=timedelta(seconds=30),
+ )
+
+ self._complete_task = True
+
+ @workflow.run
+ async def on_task_create(self, params: CreateTaskParams) -> Dict[str, Any]:
+ logger.info("GitHub research task created: %s", params)
+ self._task_id = params.task.id
+
+ await adk.messages.create(
+ task_id=params.task.id,
+ content=TextContent(
+ author="agent",
+ content="GitHub Research agent initialized. Send a research query to begin.",
+ ),
+ )
+
+ await workflow.wait_condition(lambda: self._complete_task, timeout=None)
+ await workflow.wait_condition(lambda: workflow.all_handlers_finished())
+
+ return {"status": "complete", "result": self._research_result}
+
+ def _extract_payload(self, params: SendEventParams) -> dict:
+ if params.event.content and hasattr(params.event.content, "content"):
+ raw_content = params.event.content.content or ""
+ else:
+ raw_content = ""
+ if isinstance(raw_content, dict):
+ return raw_content
+ if isinstance(raw_content, str):
+ try:
+ return json.loads(raw_content)
+ except json.JSONDecodeError:
+ return {"raw_content": raw_content}
+ return {"raw_content": str(raw_content)}
diff --git a/examples/demos/deep_research/github_researcher/pyproject.toml b/examples/demos/deep_research/github_researcher/pyproject.toml
new file mode 100644
index 000000000..6d14961d3
--- /dev/null
+++ b/examples/demos/deep_research/github_researcher/pyproject.toml
@@ -0,0 +1,35 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "deep-research-github"
+version = "0.1.0"
+description = "GitHub research subagent using MCP server"
+requires-python = ">=3.12"
+dependencies = [
+ "agentex-sdk>=0.6.5",
+ "openai-agents>=0.4.2",
+ "temporalio>=1.18.2",
+ "scale-gp",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest",
+ "black",
+ "isort",
+ "flake8",
+ "debugpy>=1.8.15",
+]
+
+[tool.hatch.build.targets.wheel]
+packages = ["project"]
+
+[tool.black]
+line-length = 88
+target-version = ['py312']
+
+[tool.isort]
+profile = "black"
+line_length = 88
diff --git a/examples/demos/deep_research/orchestrator/.dockerignore b/examples/demos/deep_research/orchestrator/.dockerignore
new file mode 100644
index 000000000..c49489471
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/.dockerignore
@@ -0,0 +1,43 @@
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# Environments
+.env**
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# IDE
+.idea/
+.vscode/
+*.swp
+*.swo
+
+# Git
+.git
+.gitignore
+
+# Misc
+.DS_Store
diff --git a/examples/demos/deep_research/orchestrator/Dockerfile b/examples/demos/deep_research/orchestrator/Dockerfile
new file mode 100644
index 000000000..0a7602944
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/Dockerfile
@@ -0,0 +1,40 @@
+# syntax=docker/dockerfile:1.3
+FROM python:3.12-slim
+COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/
+
+# Install system dependencies
+RUN apt-get update && apt-get install -y \
+ htop \
+ vim \
+ curl \
+ tar \
+ python3-dev \
+ build-essential \
+ gcc \
+ cmake \
+ netcat-openbsd \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/**
+
+# Install tctl (Temporal CLI)
+RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \
+ tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \
+ chmod +x /usr/local/bin/tctl && \
+ rm /tmp/tctl.tar.gz
+
+RUN uv pip install --system --upgrade pip setuptools wheel
+
+ENV UV_HTTP_TIMEOUT=1000
+
+COPY deep_research/orchestrator/pyproject.toml /app/orchestrator/pyproject.toml
+
+WORKDIR /app/orchestrator
+
+COPY deep_research/orchestrator/project /app/orchestrator/project
+
+RUN uv pip install --system .
+
+ENV PYTHONPATH=/app
+ENV AGENT_NAME=deep-research-orchestrator
+
+CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]
diff --git a/examples/demos/deep_research/orchestrator/environments.yaml b/examples/demos/deep_research/orchestrator/environments.yaml
new file mode 100644
index 000000000..71fc1f08c
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/environments.yaml
@@ -0,0 +1,27 @@
+# Agent Environment Configuration
+schema_version: "v1"
+environments:
+ dev:
+ auth:
+ principal:
+ user_id: # TODO: Fill in
+ account_id: # TODO: Fill in
+ helm_overrides:
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
+ temporal-worker:
+ enabled: true
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
diff --git a/examples/demos/deep_research/orchestrator/manifest.yaml b/examples/demos/deep_research/orchestrator/manifest.yaml
new file mode 100644
index 000000000..b3727e20d
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/manifest.yaml
@@ -0,0 +1,64 @@
+# Agent Manifest Configuration
+# ---------------------------
+# This file defines how your agent should be built and deployed.
+
+build:
+ context:
+ root: ../../
+ include_paths:
+ - deep_research/orchestrator
+ dockerfile: deep_research/orchestrator/Dockerfile
+ dockerignore: deep_research/orchestrator/.dockerignore
+
+local_development:
+ agent:
+ port: 8010
+ host_address: host.docker.internal
+
+ paths:
+ acp: project/acp.py
+ worker: project/run_worker.py
+
+agent:
+ acp_type: async
+ name: deep-research-orchestrator
+ description: Orchestrates deep research by dispatching GitHub, Docs, and Slack subagents
+
+ temporal:
+ enabled: true
+ workflows:
+ - name: deep-research-orchestrator
+ queue_name: deep_research_orchestrator_queue
+
+ credentials:
+ - env_var_name: REDIS_URL
+ secret_name: redis-url-secret
+ secret_key: url
+ # - env_var_name: OPENAI_API_KEY
+ # secret_name: openai-api-key
+ # secret_key: api-key
+
+ env:
+ OPENAI_ORG_ID: ""
+
+deployment:
+ image:
+ repository: ""
+ tag: "latest"
+
+ imagePullSecrets:
+ - name: my-registry-secret
+
+ global:
+ agent:
+ name: "deep-research-orchestrator"
+ description: "Orchestrates deep research by dispatching GitHub, Docs, and Slack subagents"
+
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
diff --git a/examples/demos/deep_research/orchestrator/project/__init__.py b/examples/demos/deep_research/orchestrator/project/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/demos/deep_research/orchestrator/project/acp.py b/examples/demos/deep_research/orchestrator/project/acp.py
new file mode 100644
index 000000000..2b017f344
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/project/acp.py
@@ -0,0 +1,29 @@
+import os
+
+from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
+
+from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
+ TemporalStreamingModelProvider,
+)
+from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
+from agentex.lib.sdk.fastacp.fastacp import FastACP
+from agentex.lib.types.fastacp import TemporalACPConfig
+
+context_interceptor = ContextInterceptor()
+streaming_model_provider = TemporalStreamingModelProvider()
+
+# Create the ACP server
+acp = FastACP.create(
+ acp_type="async",
+ config=TemporalACPConfig(
+ type="temporal",
+ temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
+ plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)],
+ interceptors=[context_interceptor],
+ ),
+)
+
+if __name__ == "__main__":
+ import uvicorn
+
+ uvicorn.run(acp, host="0.0.0.0", port=8000)
diff --git a/examples/demos/deep_research/orchestrator/project/prompts.py b/examples/demos/deep_research/orchestrator/project/prompts.py
new file mode 100644
index 000000000..770d79376
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/project/prompts.py
@@ -0,0 +1,30 @@
+ORCHESTRATOR_SYSTEM_PROMPT = """You are the lead research coordinator. Dispatch specialized research agents and synthesize their findings.
+
+You have 3 research agents:
+
+1. **GitHub Researcher** (dispatch_github_researcher) - Searches scale-agentex (platform) and scale-agentex-python (SDK) repos for code, issues, PRs
+2. **Docs Researcher** (dispatch_docs_researcher) - Searches official AgentEx docs and DeepWiki AI-generated docs
+3. **Slack Researcher** (dispatch_slack_researcher) - Searches configured Slack channels for team discussions
+
+STRATEGY:
+
+1. **Plan**: Analyze the question and decide which agents to dispatch (2-3 agents)
+2. **Dispatch in parallel**: Call multiple dispatch tools simultaneously. Give each agent a SPECIFIC, TARGETED query - not just the user's raw question.
+3. **Evaluate**: After receiving results, assess completeness. Are there gaps? Contradictions? Need more detail?
+4. **Iterate if needed**: Dispatch again with refined queries if results are incomplete or you need to cross-check findings.
+5. **Synthesize**: Once you have comprehensive results, produce your final answer.
+
+QUERY TIPS:
+- For GitHub: mention specific class names, function names, or patterns to search for
+- For Docs: mention specific topics, concepts, or doc page names
+- For Slack: mention specific keywords or topics people would discuss
+
+OUTPUT FORMAT: After receiving all results, write a comprehensive answer with:
+
+1. **Answer** - Clear, direct answer to the question
+2. **Details** - Key findings organized by topic, with code examples where relevant
+3. **Sources** - IMPORTANT: Preserve ALL source citations from the research agents. Organize them by type:
+ - **Code:** `repo/path/file.py` (lines X-Y) - description
+ - **Docs:** [Page title](URL) - description
+ - **Slack:** #channel, @user, date - description
+4. **Gaps** - Any areas where information was limited or unclear"""
diff --git a/examples/demos/deep_research/orchestrator/project/run_worker.py b/examples/demos/deep_research/orchestrator/project/run_worker.py
new file mode 100644
index 000000000..bbadf5e8f
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/project/run_worker.py
@@ -0,0 +1,55 @@
+import asyncio
+
+from datetime import timedelta
+from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
+
+from agentex.lib.core.temporal.activities import get_all_activities
+from agentex.lib.core.temporal.workers.worker import AgentexWorker
+from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content
+from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
+ TemporalStreamingModelProvider,
+)
+from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
+from agentex.lib.utils.logging import make_logger
+from agentex.lib.environment_variables import EnvironmentVariables
+
+from project.workflow import ResearchOrchestratorWorkflow
+
+environment_variables = EnvironmentVariables.refresh()
+
+logger = make_logger(__name__)
+
+
+async def main():
+ task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
+ if task_queue_name is None:
+ raise ValueError("WORKFLOW_TASK_QUEUE is not set")
+
+ all_activities = get_all_activities() + [stream_lifecycle_content]
+
+ context_interceptor = ContextInterceptor()
+ streaming_model_provider = TemporalStreamingModelProvider()
+
+ model_params = ModelActivityParameters(
+ start_to_close_timeout=timedelta(minutes=10),
+ )
+
+ worker = AgentexWorker(
+ task_queue=task_queue_name,
+ plugins=[
+ OpenAIAgentsPlugin(
+ model_params=model_params,
+ model_provider=streaming_model_provider,
+ ),
+ ],
+ interceptors=[context_interceptor],
+ )
+
+ await worker.run(
+ activities=all_activities,
+ workflow=ResearchOrchestratorWorkflow,
+ )
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/demos/deep_research/orchestrator/project/workflow.py b/examples/demos/deep_research/orchestrator/project/workflow.py
new file mode 100644
index 000000000..0f07ed97f
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/project/workflow.py
@@ -0,0 +1,235 @@
+from __future__ import annotations
+
+import asyncio
+import json
+from datetime import timedelta
+from typing import Any, Dict, List
+
+from agents import Agent, Runner, ModelSettings, function_tool
+from openai.types.shared import Reasoning
+from temporalio import workflow
+
+from agentex.lib import adk
+from agentex.lib.types.acp import CreateTaskParams, SendEventParams
+from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers
+from agentex.lib.core.temporal.activities.adk.acp.acp_activities import (
+ ACPActivityName,
+ EventSendParams,
+)
+from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow
+from agentex.lib.core.temporal.types.workflow import SignalName
+from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks
+from agentex.lib.utils.logging import make_logger
+from agentex.types.event import Event
+from agentex.types.text_content import TextContent
+from agentex.lib.environment_variables import EnvironmentVariables
+
+from project.prompts import ORCHESTRATOR_SYSTEM_PROMPT
+
+environment_variables = EnvironmentVariables.refresh()
+
+if environment_variables.WORKFLOW_NAME is None:
+ raise ValueError("Environment variable WORKFLOW_NAME is not set")
+
+if environment_variables.AGENT_NAME is None:
+ raise ValueError("Environment variable AGENT_NAME is not set")
+
+logger = make_logger(__name__)
+
+# Update these to match your subagent names from their manifest.yaml files
+GITHUB_AGENT_NAME = "deep-research-github"
+DOCS_AGENT_NAME = "deep-research-docs"
+SLACK_AGENT_NAME = "deep-research-slack"
+
+
+@workflow.defn(name=environment_variables.WORKFLOW_NAME)
+class ResearchOrchestratorWorkflow(BaseWorkflow):
+ """Orchestrates deep research by dispatching GitHub, Docs, and Slack subagents."""
+
+ def __init__(self) -> None:
+ super().__init__(display_name=environment_variables.AGENT_NAME)
+ self._complete_task = False
+ self._research_result: str | None = None
+ self._task_id: str | None = None
+ self._trace_id: str | None = None
+ self._parent_span_id: str | None = None
+ self._agent_id: str | None = None
+ self._input_list: List[Dict[str, Any]] = []
+ # Stores results from subagents keyed by child_task_id
+ self._subagent_results: Dict[str, str] = {}
+
+ @workflow.signal(name=SignalName.RECEIVE_EVENT)
+ async def on_task_event_send(self, params: SendEventParams) -> None:
+ logger.info("Orchestrator received event: %s", params)
+
+ if self._task_id is None:
+ self._task_id = params.task.id
+ if self._trace_id is None:
+ self._trace_id = params.task.id
+ if self._parent_span_id is None:
+ self._parent_span_id = params.task.id
+ if self._agent_id is None and getattr(params, "agent", None):
+ self._agent_id = params.agent.id
+
+ payload = self._extract_payload(params)
+
+ # Check if this is a research_complete event from a subagent
+ event_type = payload.get("event_type")
+ if event_type == "research_complete":
+ child_task_id = payload.get("child_task_id", "")
+ result = payload.get("result", "")
+ source = payload.get("source_agent", "unknown")
+ self._subagent_results[child_task_id] = result
+ logger.info("Received research result from %s (child: %s)", source, child_task_id)
+ return
+
+ # Otherwise, this is a user query
+ query = payload.get("query", payload.get("raw_content", ""))
+ if not query:
+ await adk.messages.create(
+ task_id=params.task.id,
+ content=TextContent(
+ author="agent",
+ content="No research query provided. Please send a question.",
+ ),
+ )
+ return
+
+ await adk.messages.create(
+ task_id=params.task.id,
+ content=TextContent(
+ author="agent",
+ content=(
+ f"Starting deep research for: {query}\n\n"
+ "I'll dispatch specialized research agents to search GitHub, documentation, and Slack."
+ ),
+ ),
+ )
+
+ self._input_list.append({"role": "user", "content": query})
+
+ # Create dispatch tools that operate within this workflow's context
+ dispatch_github = self._make_dispatch_tool(GITHUB_AGENT_NAME, "dispatch_github_researcher",
+ "Dispatch the GitHub research agent to search across GitHub repos. "
+ "Returns comprehensive findings from code, issues, and PRs.")
+
+ dispatch_docs = self._make_dispatch_tool(DOCS_AGENT_NAME, "dispatch_docs_researcher",
+ "Dispatch the Docs research agent to search documentation. "
+ "Returns findings from documentation sources.")
+
+ dispatch_slack = self._make_dispatch_tool(SLACK_AGENT_NAME, "dispatch_slack_researcher",
+ "Dispatch the Slack research agent to search Slack channels. "
+ "Returns findings from team discussions.")
+
+ agent = Agent(
+ name="ResearchOrchestrator",
+ instructions=ORCHESTRATOR_SYSTEM_PROMPT,
+ model="gpt-5.1",
+ tools=[dispatch_github, dispatch_docs, dispatch_slack],
+ model_settings=ModelSettings(
+ reasoning=Reasoning(effort="high", summary="auto"),
+ ),
+ )
+
+ hooks = TemporalStreamingHooks(task_id=params.task.id, timeout=timedelta(minutes=2))
+ result = await Runner.run(agent, self._input_list, hooks=hooks, max_turns=50)
+
+ self._research_result = result.final_output
+ self._input_list = result.to_input_list()
+ self._complete_task = True
+
+ @workflow.run
+ async def on_task_create(self, params: CreateTaskParams) -> Dict[str, Any]:
+ logger.info("Research orchestrator task created: %s", params)
+ self._task_id = params.task.id
+ self._agent_id = params.agent.id
+
+ await adk.messages.create(
+ task_id=params.task.id,
+ content=TextContent(
+ author="agent",
+ content=(
+ "Deep Research Orchestrator initialized.\n\n"
+ "Send me a question and I'll coordinate research across "
+ "GitHub repos, official documentation, and Slack discussions to give you "
+ "a comprehensive answer."
+ ),
+ ),
+ )
+
+ await workflow.wait_condition(lambda: self._complete_task, timeout=None)
+ await workflow.wait_condition(lambda: workflow.all_handlers_finished())
+
+ return {"status": "complete", "result": self._research_result}
+
+ def _make_dispatch_tool(self, agent_name: str, tool_name: str, description: str):
+ """Create a @function_tool that dispatches to a subagent via ACP and waits for results."""
+ workflow_instance = self
+
+ @function_tool(name_override=tool_name, description_override=description)
+ async def dispatch(query: str) -> str:
+ """
+ Args:
+ query: The specific research query for this subagent. Be specific about
+ what to search for - include repo names, class/function names,
+ doc page names, or channel names when possible.
+ """
+ logger.info("Dispatching %s with query: %s", agent_name, query)
+
+ # Create a child task via ACP activity.
+ # Pass source_task_id so the subagent can write messages to our task.
+ task = await adk.acp.create_task(
+ name=f"{agent_name}-{workflow.uuid4()}",
+ agent_name=agent_name,
+ params={
+ "source_task_id": workflow_instance._task_id,
+ "parent_agent_name": environment_variables.AGENT_NAME,
+ },
+ )
+ child_task_id = task.id
+ logger.info("Created child task %s for %s", child_task_id, agent_name)
+
+ # Send the query as an event to the child agent
+ await ActivityHelpers.execute_activity(
+ activity_name=ACPActivityName.EVENT_SEND,
+ request=EventSendParams(
+ agent_name=agent_name,
+ task_id=child_task_id,
+ content=TextContent(
+ author="user",
+ content=json.dumps({"query": query}),
+ ),
+ ),
+ response_type=Event,
+ start_to_close_timeout=timedelta(seconds=30),
+ )
+
+ # Wait for the subagent to send back a completion event
+ # The signal handler stores results in _subagent_results by child_task_id
+ try:
+ await workflow.wait_condition(
+ lambda: child_task_id in workflow_instance._subagent_results,
+ timeout=timedelta(minutes=10),
+ )
+ except asyncio.TimeoutError:
+ return f"Research from {agent_name} timed out after 10 minutes."
+
+ result = workflow_instance._subagent_results.get(child_task_id, "No result received.")
+ logger.info("Got result from %s (length: %d)", agent_name, len(result))
+ return result
+
+ return dispatch
+
+ def _extract_payload(self, params: SendEventParams) -> dict:
+ if params.event.content and hasattr(params.event.content, "content"):
+ raw_content = params.event.content.content or ""
+ else:
+ raw_content = ""
+ if isinstance(raw_content, dict):
+ return raw_content
+ if isinstance(raw_content, str):
+ try:
+ return json.loads(raw_content)
+ except json.JSONDecodeError:
+ return {"raw_content": raw_content}
+ return {"raw_content": str(raw_content)}
diff --git a/examples/demos/deep_research/orchestrator/pyproject.toml b/examples/demos/deep_research/orchestrator/pyproject.toml
new file mode 100644
index 000000000..417b0bcac
--- /dev/null
+++ b/examples/demos/deep_research/orchestrator/pyproject.toml
@@ -0,0 +1,35 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "deep-research-orchestrator"
+version = "0.1.0"
+description = "Orchestrates deep research by dispatching specialized subagents"
+requires-python = ">=3.12"
+dependencies = [
+ "agentex-sdk>=0.6.5",
+ "openai-agents>=0.4.2",
+ "temporalio>=1.18.2",
+ "scale-gp",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest",
+ "black",
+ "isort",
+ "flake8",
+ "debugpy>=1.8.15",
+]
+
+[tool.hatch.build.targets.wheel]
+packages = ["project"]
+
+[tool.black]
+line-length = 88
+target-version = ['py312']
+
+[tool.isort]
+profile = "black"
+line_length = 88
diff --git a/examples/demos/deep_research/slack_researcher/.dockerignore b/examples/demos/deep_research/slack_researcher/.dockerignore
new file mode 100644
index 000000000..c49489471
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/.dockerignore
@@ -0,0 +1,43 @@
+# Python
+__pycache__/
+*.py[cod]
+*$py.class
+*.so
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+
+# Environments
+.env**
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# IDE
+.idea/
+.vscode/
+*.swp
+*.swo
+
+# Git
+.git
+.gitignore
+
+# Misc
+.DS_Store
diff --git a/examples/demos/deep_research/slack_researcher/Dockerfile b/examples/demos/deep_research/slack_researcher/Dockerfile
new file mode 100644
index 000000000..be8e2bb9e
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/Dockerfile
@@ -0,0 +1,42 @@
+# syntax=docker/dockerfile:1.3
+FROM python:3.12-slim
+COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/
+
+# Install system dependencies (includes nodejs/npm for Slack MCP server)
+RUN apt-get update && apt-get install -y \
+ htop \
+ vim \
+ curl \
+ tar \
+ python3-dev \
+ build-essential \
+ gcc \
+ cmake \
+ netcat-openbsd \
+ nodejs \
+ npm \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/**
+
+# Install tctl (Temporal CLI)
+RUN curl -L https://github.com/temporalio/tctl/releases/download/v1.18.1/tctl_1.18.1_linux_arm64.tar.gz -o /tmp/tctl.tar.gz && \
+ tar -xzf /tmp/tctl.tar.gz -C /usr/local/bin && \
+ chmod +x /usr/local/bin/tctl && \
+ rm /tmp/tctl.tar.gz
+
+RUN uv pip install --system --upgrade pip setuptools wheel
+
+ENV UV_HTTP_TIMEOUT=1000
+
+COPY deep_research/slack_researcher/pyproject.toml /app/slack_researcher/pyproject.toml
+
+WORKDIR /app/slack_researcher
+
+COPY deep_research/slack_researcher/project /app/slack_researcher/project
+
+RUN uv pip install --system .
+
+ENV PYTHONPATH=/app
+ENV AGENT_NAME=deep-research-slack
+
+CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"]
diff --git a/examples/demos/deep_research/slack_researcher/environments.yaml b/examples/demos/deep_research/slack_researcher/environments.yaml
new file mode 100644
index 000000000..71fc1f08c
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/environments.yaml
@@ -0,0 +1,27 @@
+# Agent Environment Configuration
+schema_version: "v1"
+environments:
+ dev:
+ auth:
+ principal:
+ user_id: # TODO: Fill in
+ account_id: # TODO: Fill in
+ helm_overrides:
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
+ temporal-worker:
+ enabled: true
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
diff --git a/examples/demos/deep_research/slack_researcher/manifest.yaml b/examples/demos/deep_research/slack_researcher/manifest.yaml
new file mode 100644
index 000000000..e305cf387
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/manifest.yaml
@@ -0,0 +1,59 @@
+# Agent Manifest Configuration
+
+build:
+ context:
+ root: ../../
+ include_paths:
+ - deep_research/slack_researcher
+ dockerfile: deep_research/slack_researcher/Dockerfile
+ dockerignore: deep_research/slack_researcher/.dockerignore
+
+local_development:
+ agent:
+ port: 8013
+ host_address: host.docker.internal
+
+ paths:
+ acp: project/acp.py
+ worker: project/run_worker.py
+
+agent:
+ acp_type: async
+ name: deep-research-slack
+ description: Searches Slack channels for relevant discussions and context
+
+ temporal:
+ enabled: true
+ workflows:
+ - name: deep-research-slack
+ queue_name: deep_research_slack_queue
+
+ credentials:
+ - env_var_name: REDIS_URL
+ secret_name: redis-url-secret
+ secret_key: url
+
+ env:
+ OPENAI_ORG_ID: ""
+
+deployment:
+ image:
+ repository: ""
+ tag: "latest"
+
+ imagePullSecrets:
+ - name: my-registry-secret
+
+ global:
+ agent:
+ name: "deep-research-slack"
+ description: "Searches Slack channels for relevant discussions and context"
+
+ replicaCount: 1
+ resources:
+ requests:
+ cpu: "500m"
+ memory: "1Gi"
+ limits:
+ cpu: "1000m"
+ memory: "2Gi"
diff --git a/examples/demos/deep_research/slack_researcher/project/__init__.py b/examples/demos/deep_research/slack_researcher/project/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/demos/deep_research/slack_researcher/project/acp.py b/examples/demos/deep_research/slack_researcher/project/acp.py
new file mode 100644
index 000000000..2b017f344
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/project/acp.py
@@ -0,0 +1,29 @@
+import os
+
+from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
+
+from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
+ TemporalStreamingModelProvider,
+)
+from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
+from agentex.lib.sdk.fastacp.fastacp import FastACP
+from agentex.lib.types.fastacp import TemporalACPConfig
+
+context_interceptor = ContextInterceptor()
+streaming_model_provider = TemporalStreamingModelProvider()
+
+# Create the ACP server
+acp = FastACP.create(
+ acp_type="async",
+ config=TemporalACPConfig(
+ type="temporal",
+ temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
+ plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)],
+ interceptors=[context_interceptor],
+ ),
+)
+
+if __name__ == "__main__":
+ import uvicorn
+
+ uvicorn.run(acp, host="0.0.0.0", port=8000)
diff --git a/examples/demos/deep_research/slack_researcher/project/run_worker.py b/examples/demos/deep_research/slack_researcher/project/run_worker.py
new file mode 100644
index 000000000..e06fffb03
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/project/run_worker.py
@@ -0,0 +1,71 @@
+import asyncio
+import os
+
+from agents.mcp import MCPServerStdio
+from datetime import timedelta
+from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters, StatelessMCPServerProvider
+
+from agentex.lib.core.temporal.activities import get_all_activities
+from agentex.lib.core.temporal.workers.worker import AgentexWorker
+from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
+from agentex.lib.utils.logging import make_logger
+from agentex.lib.environment_variables import EnvironmentVariables
+
+from project.workflow import SlackResearchWorkflow
+
+environment_variables = EnvironmentVariables.refresh()
+
+logger = make_logger(__name__)
+
+
+async def main():
+ task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
+ if task_queue_name is None:
+ raise ValueError("WORKFLOW_TASK_QUEUE is not set")
+
+ slack_token = os.environ.get("SLACK_BOT_TOKEN", "")
+ slack_team_id = os.environ.get("SLACK_TEAM_ID", "")
+
+ slack_env = {**os.environ, "SLACK_BOT_TOKEN": slack_token}
+ if slack_team_id:
+ slack_env["SLACK_TEAM_ID"] = slack_team_id
+
+ slack_server = StatelessMCPServerProvider(
+ name="SlackServer",
+ server_factory=lambda: MCPServerStdio(
+ name="SlackServer",
+ params={
+ "command": "npx",
+ "args": ["-y", "@modelcontextprotocol/server-slack"],
+ "env": slack_env,
+ },
+ ),
+ )
+
+ all_activities = get_all_activities()
+
+ context_interceptor = ContextInterceptor()
+
+ model_params = ModelActivityParameters(
+ start_to_close_timeout=timedelta(minutes=10),
+ )
+
+ worker = AgentexWorker(
+ task_queue=task_queue_name,
+ plugins=[
+ OpenAIAgentsPlugin(
+ model_params=model_params,
+ mcp_server_providers=[slack_server],
+ ),
+ ],
+ interceptors=[context_interceptor],
+ )
+
+ await worker.run(
+ activities=all_activities,
+ workflow=SlackResearchWorkflow,
+ )
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/demos/deep_research/slack_researcher/project/summarization.py b/examples/demos/deep_research/slack_researcher/project/summarization.py
new file mode 100644
index 000000000..a7215e595
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/project/summarization.py
@@ -0,0 +1,129 @@
+"""
+Conversation compaction for research agents.
+
+Prevents Temporal payload size limit (~2MB) from being exceeded by compacting
+old tool outputs between batch iterations of Runner.run().
+"""
+from __future__ import annotations
+
+import json
+from typing import Any, Dict, List, Optional
+
+from agents import Agent
+
+from agentex.lib.utils.logging import make_logger
+
+logger = make_logger(__name__)
+
+# Trigger compaction when serialized conversation exceeds this size (bytes).
+# Temporal payload limit is ~2MB; we compact well before that.
+COMPACTION_BYTE_THRESHOLD = 800_000 # 800 KB
+
+# Always keep the last N tool outputs in full (most recent context for the model).
+KEEP_RECENT_OUTPUTS = 3
+
+# Stub text that replaces truncated tool outputs.
+TRUNCATED_STUB = "[Previous tool output truncated. Key findings were incorporated into the assistant's analysis.]"
+
+
+def estimate_payload_size(input_list: List[Dict[str, Any]]) -> int:
+ """Estimate the serialized byte size of the conversation."""
+ try:
+ return len(json.dumps(input_list, default=str))
+ except Exception:
+ return sum(len(str(item)) for item in input_list)
+
+
+def should_compact(input_list: List[Dict[str, Any]]) -> bool:
+ """Check if the conversation payload exceeds the compaction threshold."""
+ size = estimate_payload_size(input_list)
+ logger.info("Conversation payload size: %d bytes", size)
+ return size > COMPACTION_BYTE_THRESHOLD
+
+
+def compact_tool_outputs(input_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """
+ Replace old tool outputs with short stubs to reduce payload size.
+
+ Keeps the most recent KEEP_RECENT_OUTPUTS tool outputs in full.
+ Older outputs are replaced with a truncation stub.
+
+ Works with both Responses API format (function_call_output) and
+ Chat Completions format (role=tool).
+ """
+ # Find indices of tool output items
+ output_indices = []
+ for i, item in enumerate(input_list):
+ if not isinstance(item, dict):
+ continue
+ # Responses API format
+ if item.get("type") == "function_call_output":
+ output_indices.append(i)
+ # Chat Completions format
+ elif item.get("role") == "tool":
+ output_indices.append(i)
+
+ if len(output_indices) <= KEEP_RECENT_OUTPUTS:
+ logger.info("Only %d tool outputs, no compaction needed", len(output_indices))
+ return input_list
+
+ # Truncate all but the most recent N outputs
+ indices_to_truncate = output_indices[:-KEEP_RECENT_OUTPUTS]
+ compacted = list(input_list) # shallow copy
+
+ for idx in indices_to_truncate:
+ item = compacted[idx]
+ # Responses API format
+ if item.get("type") == "function_call_output":
+ output_val = item.get("output", "")
+ if len(str(output_val)) > 200:
+ compacted[idx] = {**item, "output": TRUNCATED_STUB}
+ # Chat Completions format
+ elif item.get("role") == "tool":
+ content_val = item.get("content", "")
+ if len(str(content_val)) > 200:
+ compacted[idx] = {**item, "content": TRUNCATED_STUB}
+
+ before = estimate_payload_size(input_list)
+ after = estimate_payload_size(compacted)
+ logger.info("Compacted conversation: %d -> %d bytes (%d tool outputs truncated)",
+ before, after, len(indices_to_truncate))
+ return compacted
+
+
+def new_summarization_agent() -> Agent:
+ """Create a lightweight agent that summarizes research findings."""
+ return Agent(
+ name="ResearchSummarizer",
+ instructions="""Summarize the research conversation concisely. Focus on:
+- Key findings and code references discovered
+- File paths, function names, and relevant snippets
+- What questions were answered and what gaps remain
+- Current state of the research
+
+Be comprehensive but concise (3-5 paragraphs). Focus on OUTCOMES, not listing every tool call.""",
+ model="gpt-4.1-mini",
+ tools=[],
+ )
+
+
+def find_last_summary_index(input_list: List[Dict[str, Any]]) -> Optional[int]:
+ """Find the index of the last summary message."""
+ for i in range(len(input_list) - 1, -1, -1):
+ item = input_list[i]
+ if isinstance(item, dict) and item.get("_summary") is True:
+ return i
+ return None
+
+
+def apply_summary_to_input_list(
+ input_list: List[Dict[str, Any]],
+ summary_text: str,
+ original_query: str,
+) -> List[Dict[str, Any]]:
+ """Replace the conversation with a summary + resume instruction."""
+ return [
+ {"role": "user", "content": original_query},
+ {"role": "assistant", "content": summary_text, "_summary": True},
+ {"role": "user", "content": "Use the above summary of your previous research to continue. If you have enough information, provide your final synthesis. Otherwise, continue searching.", "_synthetic": True},
+ ]
diff --git a/examples/demos/deep_research/slack_researcher/project/workflow.py b/examples/demos/deep_research/slack_researcher/project/workflow.py
new file mode 100644
index 000000000..01314b615
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/project/workflow.py
@@ -0,0 +1,247 @@
+from __future__ import annotations
+
+import json
+from datetime import timedelta
+from typing import Any, Dict, List
+
+from agents import Agent, Runner
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.contrib import openai_agents
+from temporalio.workflow import ActivityConfig
+
+from agentex.lib import adk
+from agentex.lib.types.acp import CreateTaskParams, SendEventParams
+from agentex.lib.core.temporal.activities.activity_helpers import ActivityHelpers
+from agentex.lib.core.temporal.activities.adk.acp.acp_activities import (
+ ACPActivityName,
+ EventSendParams,
+)
+from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow
+from agentex.lib.core.temporal.types.workflow import SignalName
+from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks
+from agentex.lib.utils.logging import make_logger
+from agentex.types.event import Event
+from agentex.types.text_content import TextContent
+from agentex.lib.environment_variables import EnvironmentVariables
+from project.summarization import (
+ should_compact,
+ compact_tool_outputs,
+ new_summarization_agent,
+ apply_summary_to_input_list,
+)
+
+environment_variables = EnvironmentVariables.refresh()
+
+if environment_variables.WORKFLOW_NAME is None:
+ raise ValueError("Environment variable WORKFLOW_NAME is not set")
+
+if environment_variables.AGENT_NAME is None:
+ raise ValueError("Environment variable AGENT_NAME is not set")
+
+logger = make_logger(__name__)
+
+
+# Update this prompt with your own Slack channels and channel IDs
+SYSTEM_PROMPT = """You are a Slack research specialist. Search Slack for relevant discussions, threads, and context.
+
+You have access to the Slack MCP server which lets you search messages, read channels, and explore threads.
+
+KEY SLACK CHANNELS:
+- #your-main-channel (Channel ID: YOUR_CHANNEL_ID) - Main discussion channel
+- #your-help-channel (Channel ID: YOUR_CHANNEL_ID) - Help and support questions
+- #your-team-channel (Channel ID: YOUR_CHANNEL_ID) - Team discussions
+
+IMPORTANT: Use channel IDs (e.g. "C0123456789") not channel names when calling tools.
+
+RULES:
+1. Use slack_search_messages for searching - NEVER fetch full channel history (too large, will time out)
+2. After searching, read 2-3 of the most relevant threads for full context
+3. After 5-6 tool calls, produce your final answer as plain text (no tool call)
+
+GUIDELINES:
+- Search with different keyword variations to find relevant discussions
+- Note who said what and when for attribution
+- If you see a summary of previous research, build on it rather than repeating searches
+
+OUTPUT FORMAT - When done, write your findings with a **Sources** section at the end:
+
+**Findings:**
+[Your analysis of discussions and conclusions]
+
+**Sources:**
+For every piece of information, cite the Slack source:
+- #channel-name, @username, date - brief description of what was discussed"""
+
+
+@workflow.defn(name=environment_variables.WORKFLOW_NAME)
+class SlackResearchWorkflow(BaseWorkflow):
+ def __init__(self) -> None:
+ super().__init__(display_name=environment_variables.AGENT_NAME)
+ self._complete_task = False
+ self._research_result: str | None = None
+ self._task_id: str | None = None
+ self._trace_id: str | None = None
+ self._parent_span_id: str | None = None
+ self._input_list: List[Dict[str, Any]] = []
+
+ @workflow.signal(name=SignalName.RECEIVE_EVENT)
+ async def on_task_event_send(self, params: SendEventParams) -> None:
+ logger.info("Slack researcher received event: %s", params)
+ self._task_id = params.task.id
+ self._trace_id = params.task.id
+ self._parent_span_id = params.task.id
+
+ payload = self._extract_payload(params)
+
+ parent_task_id = None
+ parent_agent_name = None
+ if params.task and getattr(params.task, "params", None):
+ parent_task_id = params.task.params.get("source_task_id")
+ parent_agent_name = params.task.params.get("parent_agent_name")
+ if not parent_task_id:
+ parent_task_id = payload.get("source_task_id")
+ if not parent_agent_name:
+ parent_agent_name = payload.get("parent_agent_name")
+
+ # Write messages to the parent task so everything appears in one conversation
+ message_task_id = parent_task_id or params.task.id
+
+ query = payload.get("query", payload.get("raw_content", ""))
+ if not query:
+ await adk.messages.create(
+ task_id=message_task_id,
+ content=TextContent(author="agent", content="No research query provided."),
+ )
+ self._complete_task = True
+ return
+
+ await adk.messages.create(
+ task_id=message_task_id,
+ content=TextContent(author="agent", content=f"Starting Slack research for: {query}"),
+ )
+
+ self._input_list.append({"role": "user", "content": query})
+
+ # Reference MCP server by name (registered on the worker)
+ slack_server = openai_agents.workflow.stateless_mcp_server(
+ "SlackServer",
+ config=ActivityConfig(
+ start_to_close_timeout=timedelta(minutes=10),
+ retry_policy=RetryPolicy(
+ maximum_attempts=3,
+ initial_interval=timedelta(seconds=10),
+ backoff_coefficient=2.0,
+ ),
+ ),
+ )
+
+ agent = Agent(
+ name="SlackResearcher",
+ instructions=SYSTEM_PROMPT,
+ model="gpt-4.1-mini",
+ mcp_servers=[slack_server],
+ )
+
+ hooks = TemporalStreamingHooks(task_id=message_task_id, timeout=timedelta(minutes=2))
+
+ TURNS_PER_BATCH = 7
+ MAX_BATCHES = 5
+
+ for batch_num in range(MAX_BATCHES):
+ try:
+ result = await Runner.run(agent, self._input_list, hooks=hooks, max_turns=TURNS_PER_BATCH)
+ self._input_list = result.to_input_list()
+
+ if result.final_output:
+ self._research_result = result.final_output
+ break
+ except Exception as e:
+ error_msg = str(e)
+ if "Max turns" in error_msg:
+ logger.warning("Slack batch %d hit max turns, attempting synthesis", batch_num)
+ try:
+ synth_input = self._input_list + [
+ {"role": "user", "content": "Synthesize ALL your findings and provide your final comprehensive answer now."}
+ ]
+ synth_result = await Runner.run(agent, synth_input, max_turns=2)
+ self._research_result = synth_result.final_output or "Research incomplete."
+ except Exception:
+ self._research_result = f"Slack research exceeded turn limits after {batch_num + 1} batches."
+ break
+ else:
+ logger.warning("Slack research error in batch %d: %s", batch_num, e)
+ self._research_result = f"Slack research was partially completed but encountered an error: {e}"
+ break
+
+ if should_compact(self._input_list):
+ logger.info("Compacting slack conversation after batch %d", batch_num)
+ self._input_list = compact_tool_outputs(self._input_list)
+
+ if should_compact(self._input_list):
+ try:
+ summary_agent = new_summarization_agent()
+ summary_result = await Runner.run(summary_agent, self._input_list, max_turns=1)
+ if summary_result.final_output:
+ self._input_list = apply_summary_to_input_list(
+ self._input_list, summary_result.final_output, query
+ )
+ except Exception as se:
+ logger.warning("Summarization failed: %s", se)
+ else:
+ if not self._research_result:
+ self._research_result = "Slack research reached maximum iterations without producing a final result."
+
+ if parent_task_id and parent_agent_name:
+ await ActivityHelpers.execute_activity(
+ activity_name=ACPActivityName.EVENT_SEND,
+ request=EventSendParams(
+ agent_name=parent_agent_name,
+ task_id=parent_task_id,
+ content=TextContent(
+ author="agent",
+ content=json.dumps({
+ "event_type": "research_complete",
+ "source_agent": environment_variables.AGENT_NAME,
+ "child_task_id": params.task.id,
+ "result": self._research_result or "No results found.",
+ }),
+ ),
+ ),
+ response_type=Event,
+ start_to_close_timeout=timedelta(seconds=30),
+ )
+
+ self._complete_task = True
+
+ @workflow.run
+ async def on_task_create(self, params: CreateTaskParams) -> Dict[str, Any]:
+ logger.info("Slack research task created: %s", params)
+ self._task_id = params.task.id
+
+ await adk.messages.create(
+ task_id=params.task.id,
+ content=TextContent(
+ author="agent",
+ content="Slack Research agent initialized. Send a research query to begin.",
+ ),
+ )
+
+ await workflow.wait_condition(lambda: self._complete_task, timeout=None)
+ await workflow.wait_condition(lambda: workflow.all_handlers_finished())
+
+ return {"status": "complete", "result": self._research_result}
+
+ def _extract_payload(self, params: SendEventParams) -> dict:
+ if params.event.content and hasattr(params.event.content, "content"):
+ raw_content = params.event.content.content or ""
+ else:
+ raw_content = ""
+ if isinstance(raw_content, dict):
+ return raw_content
+ if isinstance(raw_content, str):
+ try:
+ return json.loads(raw_content)
+ except json.JSONDecodeError:
+ return {"raw_content": raw_content}
+ return {"raw_content": str(raw_content)}
diff --git a/examples/demos/deep_research/slack_researcher/pyproject.toml b/examples/demos/deep_research/slack_researcher/pyproject.toml
new file mode 100644
index 000000000..5b9c93d40
--- /dev/null
+++ b/examples/demos/deep_research/slack_researcher/pyproject.toml
@@ -0,0 +1,35 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "deep-research-slack"
+version = "0.1.0"
+description = "Slack research subagent using MCP server"
+requires-python = ">=3.12"
+dependencies = [
+ "agentex-sdk>=0.6.5",
+ "openai-agents>=0.4.2",
+ "temporalio>=1.18.2",
+ "scale-gp",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest",
+ "black",
+ "isort",
+ "flake8",
+ "debugpy>=1.8.15",
+]
+
+[tool.hatch.build.targets.wheel]
+packages = ["project"]
+
+[tool.black]
+line-length = 88
+target-version = ['py312']
+
+[tool.isort]
+profile = "black"
+line_length = 88