From 764c6b33d5ab580c8c3f5343b443962b944dedf0 Mon Sep 17 00:00:00 2001 From: Giles Odigwe Date: Fri, 6 Feb 2026 10:11:47 -0800 Subject: [PATCH 1/4] access agent context in as tool scenarios --- .../packages/core/agent_framework/__init__.py | 1 + .../core/agent_framework/_agent_context.py | 62 +++++++++ .../packages/core/agent_framework/_agents.py | 20 ++- .../core/agent_framework/_middleware.py | 64 +++++++-- .../packages/core/agent_framework/_tools.py | 10 ++ .../core/tests/core/test_agent_context.py | 131 ++++++++++++++++++ 6 files changed, 276 insertions(+), 12 deletions(-) create mode 100644 python/packages/core/agent_framework/_agent_context.py create mode 100644 python/packages/core/tests/core/test_agent_context.py diff --git a/python/packages/core/agent_framework/__init__.py b/python/packages/core/agent_framework/__init__.py index 1e408169d1..d4d0a84eb4 100644 --- a/python/packages/core/agent_framework/__init__.py +++ b/python/packages/core/agent_framework/__init__.py @@ -9,6 +9,7 @@ _version = "0.0.0" # Fallback for development mode __version__: Final[str] = _version +from ._agent_context import * # noqa: F403 from ._agents import * # noqa: F403 from ._clients import * # noqa: F403 from ._logging import * # noqa: F403 diff --git a/python/packages/core/agent_framework/_agent_context.py b/python/packages/core/agent_framework/_agent_context.py new file mode 100644 index 0000000000..ab14368076 --- /dev/null +++ b/python/packages/core/agent_framework/_agent_context.py @@ -0,0 +1,62 @@ +# Copyright (c) Microsoft. All rights reserved. + +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager +from contextvars import ContextVar +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._middleware import AgentContext + +__all__ = [ + "agent_run_scope", + "get_current_agent_run_context", +] + +_current_agent_run_context: ContextVar[AgentContext | None] = ContextVar("agent_run_context", default=None) + + +def get_current_agent_run_context() -> AgentContext | None: + """Get the current agent run context, if any. + + Returns the AgentContext for the currently executing agent run, + or None if called outside of an agent run. This enables sub-agents + (invoked as tools) to access their parent agent's run context. + + Returns: + The current AgentContext, or None if not within an agent run. + + Examples: + .. code-block:: python + + from agent_framework import get_current_agent_run_context + + + @tool + async def my_tool() -> str: + parent_ctx = get_current_agent_run_context() + if parent_ctx and parent_ctx.thread: + # Access parent's conversation_id + conv_id = parent_ctx.thread.service_thread_id + return "done" + """ + return _current_agent_run_context.get() + + +@contextmanager +def agent_run_scope(context: AgentContext) -> Iterator[None]: + """Context manager to set the agent run context for the duration of a block. + + This is used internally by the agent framework to establish the current + run context. The context is automatically restored when the block exits. + + Args: + context: The AgentContext to set as current. + """ + token = _current_agent_run_context.set(context) + try: + yield + finally: + _current_agent_run_context.reset(token) diff --git a/python/packages/core/agent_framework/_agents.py b/python/packages/core/agent_framework/_agents.py index e42781da3c..e63581921b 100644 --- a/python/packages/core/agent_framework/_agents.py +++ b/python/packages/core/agent_framework/_agents.py @@ -450,10 +450,6 @@ def as_tool( Returns: A FunctionTool that can be used as a tool by other agents. - Raises: - TypeError: If the agent does not implement AgentProtocol. - ValueError: If the agent tool name cannot be determined. - Examples: .. code-block:: python @@ -874,6 +870,14 @@ async def _run_non_streaming() -> AgentResponse[Any]: options=options, kwargs=kwargs, ) + + # Update ambient context with resolved thread for sub-agent conversation_id inheritance + from ._agent_context import get_current_agent_run_context + + parent_context = get_current_agent_run_context() + if parent_context is not None and parent_context.thread is None: + parent_context.thread = ctx["thread"] + response = await self.chat_client.get_response( # type: ignore[call-overload] messages=ctx["thread_messages"], stream=False, @@ -945,6 +949,14 @@ async def _get_stream() -> ResponseStream[ChatResponseUpdate, ChatResponse]: kwargs=kwargs, ) ctx: _RunContext = ctx_holder["ctx"] # type: ignore[assignment] # Safe: we just assigned it + + # Update ambient context with resolved thread for sub-agent conversation_id inheritance + from ._agent_context import get_current_agent_run_context + + parent_context = get_current_agent_run_context() + if parent_context is not None and parent_context.thread is None: + parent_context.thread = ctx["thread"] + return self.chat_client.get_response( # type: ignore[call-overload, no-any-return] messages=ctx["thread_messages"], stream=True, diff --git a/python/packages/core/agent_framework/_middleware.py b/python/packages/core/agent_framework/_middleware.py index 7f6619570e..55445d053f 100644 --- a/python/packages/core/agent_framework/_middleware.py +++ b/python/packages/core/agent_framework/_middleware.py @@ -10,6 +10,7 @@ from enum import Enum from typing import TYPE_CHECKING, Any, Generic, Literal, TypeAlias, overload +from ._agent_context import agent_run_scope from ._clients import ChatClientProtocol from ._types import ( AgentResponse, @@ -1077,6 +1078,38 @@ def _middleware_handler( ) +def _wrap_stream_with_context( + inner_stream: ResponseStream[AgentResponseUpdate, AgentResponse], + context: AgentContext, +) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + """Wrap a ResponseStream to maintain agent run context during iteration. + + This ensures that `get_current_agent_run_context()` returns the correct context + when tools are invoked during streaming, including sub-agents wrapped as tools. + + Args: + inner_stream: The inner ResponseStream to wrap. + context: The AgentContext to maintain during iteration. + + Returns: + A new ResponseStream that maintains the context during iteration. + """ + + async def _iterate_with_context() -> AsyncIterable[AgentResponseUpdate]: + with agent_run_scope(context): + async for update in inner_stream: + yield update + + async def _finalize_with_context(updates: Sequence[AgentResponseUpdate]) -> AgentResponse: + with agent_run_scope(context): + return await inner_stream.get_final_response() + + return ResponseStream( + _iterate_with_context(), + finalizer=_finalize_with_context, + ) + + class AgentMiddlewareLayer: """Layer for agents to apply agent middleware around run execution.""" @@ -1157,10 +1190,7 @@ def run( combined_kwargs = dict(kwargs) combined_kwargs["middleware"] = combined_function_chat_middleware if combined_function_chat_middleware else None - # Execute with middleware if available - if not pipeline.has_middlewares: - return super().run(messages, stream=stream, thread=thread, options=options, **combined_kwargs) # type: ignore[misc, no-any-return] - + # Always create AgentContext for ambient access (enables sub-agents to access parent context) context = AgentContext( agent=self, # type: ignore[arg-type] messages=prepare_messages(messages), # type: ignore[arg-type] @@ -1170,11 +1200,29 @@ def run( kwargs=combined_kwargs, ) + # Execute without middleware pipeline if none configured + if not pipeline.has_middlewares: + if stream: + # For streaming, wrap to maintain context during iteration + inner_stream: ResponseStream[AgentResponseUpdate, AgentResponse] = super().run( # type: ignore[misc, assignment] + messages, stream=True, thread=thread, options=options, **combined_kwargs + ) + return _wrap_stream_with_context(inner_stream, context) + + async def _no_middleware_run() -> AgentResponse: + with agent_run_scope(context): + return await super(AgentMiddlewareLayer, self).run( # type: ignore[misc] + messages, stream=False, thread=thread, options=options, **combined_kwargs + ) + + return _no_middleware_run() + async def _execute() -> AgentResponse | ResponseStream[AgentResponseUpdate, AgentResponse] | None: - return await pipeline.execute( - context=context, - final_handler=self._middleware_handler, - ) + with agent_run_scope(context): + return await pipeline.execute( + context=context, + final_handler=self._middleware_handler, + ) if stream: # For streaming, wrap execution in ResponseStream.from_awaitable diff --git a/python/packages/core/agent_framework/_tools.py b/python/packages/core/agent_framework/_tools.py index 7e22b78827..c7b1f1f2c9 100644 --- a/python/packages/core/agent_framework/_tools.py +++ b/python/packages/core/agent_framework/_tools.py @@ -1748,6 +1748,9 @@ def _update_conversation_id( ) -> None: """Update kwargs and options with conversation id. + Also updates the ambient agent run context's thread if available, + enabling sub-agents (invoked as tools) to inherit the conversation_id. + Args: kwargs: The keyword arguments dictionary to update. conversation_id: The conversation ID to set, or None to skip. @@ -1764,6 +1767,13 @@ def _update_conversation_id( if options is not None: options["conversation_id"] = conversation_id + # Update the ambient context's thread so sub-agents can inherit the conversation_id + from ._agent_context import get_current_agent_run_context + + parent_context = get_current_agent_run_context() + if parent_context and parent_context.thread: + parent_context.thread.service_thread_id = conversation_id + async def _ensure_response_stream( stream_like: ResponseStream[Any, Any] | Awaitable[ResponseStream[Any, Any]], diff --git a/python/packages/core/tests/core/test_agent_context.py b/python/packages/core/tests/core/test_agent_context.py new file mode 100644 index 0000000000..7d6a041b37 --- /dev/null +++ b/python/packages/core/tests/core/test_agent_context.py @@ -0,0 +1,131 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for agent run context propagation. + +These tests verify that: +1. Agent run context is properly set during agent execution +2. Sub-agents can access parent context via get_current_agent_run_context() +3. Context is isolated between concurrent agent runs +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from unittest.mock import AsyncMock + +from agent_framework import ( + AgentContext, + ChatAgent, + ChatMessage, + ChatResponse, + agent_middleware, + get_current_agent_run_context, +) +from agent_framework._agent_context import agent_run_scope + +from .conftest import MockChatClient + + +class TestAgentContext: + """Tests for ambient agent run context.""" + + async def test_context_not_available_outside_run(self) -> None: + """Test that context is None outside of agent run.""" + context = get_current_agent_run_context() + assert context is None + + async def test_context_scope_properly_restored(self) -> None: + """Test that context is properly restored after scope exits.""" + mock_agent = AsyncMock() + mock_agent.name = "test" + + mock_context = AgentContext( + agent=mock_agent, + messages=[], + thread=None, + options=None, + stream=False, + kwargs={}, + ) + + # Before scope + assert get_current_agent_run_context() is None + + # Inside scope + with agent_run_scope(mock_context): + assert get_current_agent_run_context() is mock_context + + # After scope + assert get_current_agent_run_context() is None + + async def test_nested_context_scopes(self) -> None: + """Test that nested context scopes work correctly.""" + mock_agent1 = AsyncMock() + mock_agent1.name = "agent1" + mock_agent2 = AsyncMock() + mock_agent2.name = "agent2" + + context1 = AgentContext(agent=mock_agent1, messages=[], thread=None, options=None, stream=False, kwargs={}) + context2 = AgentContext(agent=mock_agent2, messages=[], thread=None, options=None, stream=False, kwargs={}) + + with agent_run_scope(context1): + assert get_current_agent_run_context() is context1 + + with agent_run_scope(context2): + assert get_current_agent_run_context() is context2 + + # After inner scope exits, should restore outer context + assert get_current_agent_run_context() is context1 + + assert get_current_agent_run_context() is None + + async def test_context_isolated_between_concurrent_tasks(self) -> None: + """Test that context is isolated between concurrent async tasks.""" + results: dict[str, AgentContext | None] = {} + mock_agent1 = AsyncMock() + mock_agent1.name = "agent1" + mock_agent2 = AsyncMock() + mock_agent2.name = "agent2" + + context1 = AgentContext(agent=mock_agent1, messages=[], thread=None, options=None, stream=False, kwargs={}) + context2 = AgentContext(agent=mock_agent2, messages=[], thread=None, options=None, stream=False, kwargs={}) + + async def task1() -> None: + with agent_run_scope(context1): + await asyncio.sleep(0.01) # Yield to other task + results["task1"] = get_current_agent_run_context() + + async def task2() -> None: + with agent_run_scope(context2): + await asyncio.sleep(0.01) # Yield to other task + results["task2"] = get_current_agent_run_context() + + await asyncio.gather(task1(), task2()) + + # Each task should see its own context + assert results["task1"] is context1 + assert results["task2"] is context2 + + async def test_context_available_in_middleware(self, chat_client: MockChatClient) -> None: + """Test that agent run context is available in agent middleware.""" + captured_context: AgentContext | None = None + + @agent_middleware + async def capture_middleware( + context: AgentContext, next_handler: Callable[[AgentContext], Awaitable[None]] + ) -> None: + nonlocal captured_context + # Get ambient context - should be the same as the passed context + captured_context = get_current_agent_run_context() + await next_handler(context) + + chat_client.responses = [ + ChatResponse(messages=[ChatMessage("assistant", ["Response"])]), + ] + + agent = ChatAgent(chat_client=chat_client, name="test_agent", middleware=[capture_middleware]) + await agent.run("Test message") + + assert captured_context is not None + assert captured_context.agent is agent From d3f7710a092e0d8b709a8afe4cf6e735f345d813 Mon Sep 17 00:00:00 2001 From: Giles Odigwe Date: Fri, 6 Feb 2026 10:45:00 -0800 Subject: [PATCH 2/4] mypy fix + addressed copilot comments --- .../core/agent_framework/_agent_context.py | 1 - .../core/agent_framework/_middleware.py | 2 +- .../core/tests/core/test_agent_context.py | 27 +++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/_agent_context.py b/python/packages/core/agent_framework/_agent_context.py index ab14368076..c4f5a216c9 100644 --- a/python/packages/core/agent_framework/_agent_context.py +++ b/python/packages/core/agent_framework/_agent_context.py @@ -11,7 +11,6 @@ from ._middleware import AgentContext __all__ = [ - "agent_run_scope", "get_current_agent_run_context", ] diff --git a/python/packages/core/agent_framework/_middleware.py b/python/packages/core/agent_framework/_middleware.py index 2467102b3d..df5b7aad5e 100644 --- a/python/packages/core/agent_framework/_middleware.py +++ b/python/packages/core/agent_framework/_middleware.py @@ -1211,7 +1211,7 @@ def run( async def _no_middleware_run() -> AgentResponse: with agent_run_scope(context): - return await super(AgentMiddlewareLayer, self).run( # type: ignore[misc] + return await super(AgentMiddlewareLayer, self).run( # type: ignore[misc, no-any-return] messages, stream=False, thread=thread, options=options, **combined_kwargs ) diff --git a/python/packages/core/tests/core/test_agent_context.py b/python/packages/core/tests/core/test_agent_context.py index 7d6a041b37..3c1a0a4759 100644 --- a/python/packages/core/tests/core/test_agent_context.py +++ b/python/packages/core/tests/core/test_agent_context.py @@ -129,3 +129,30 @@ async def capture_middleware( assert captured_context is not None assert captured_context.agent is agent + + async def test_context_available_in_streaming_middleware(self, chat_client: MockChatClient) -> None: + """Test that agent run context is available in middleware during streaming.""" + captured_context: AgentContext | None = None + + @agent_middleware + async def capture_middleware( + context: AgentContext, next_handler: Callable[[AgentContext], Awaitable[None]] + ) -> None: + nonlocal captured_context + captured_context = get_current_agent_run_context() + await next_handler(context) + + chat_client.responses = [ + ChatResponse(messages=[ChatMessage("assistant", ["Streaming response"])]), + ] + + agent = ChatAgent(chat_client=chat_client, name="test_agent", middleware=[capture_middleware]) + + # Run with streaming and consume the response + async for _update in agent.run("Test message", stream=True): + pass + + # Context should have been available in middleware + assert captured_context is not None + assert captured_context.agent is agent + assert captured_context.stream is True From d27bfac815e47b104997103acffb0085eacf18ed Mon Sep 17 00:00:00 2001 From: Giles Odigwe Date: Wed, 11 Feb 2026 15:45:19 -0800 Subject: [PATCH 3/4] fixed imports --- .../packages/core/tests/core/test_agent_context.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/packages/core/tests/core/test_agent_context.py b/python/packages/core/tests/core/test_agent_context.py index 3c1a0a4759..7fa7a46c4d 100644 --- a/python/packages/core/tests/core/test_agent_context.py +++ b/python/packages/core/tests/core/test_agent_context.py @@ -15,10 +15,10 @@ from unittest.mock import AsyncMock from agent_framework import ( + Agent, AgentContext, - ChatAgent, - ChatMessage, ChatResponse, + Message, agent_middleware, get_current_agent_run_context, ) @@ -121,10 +121,10 @@ async def capture_middleware( await next_handler(context) chat_client.responses = [ - ChatResponse(messages=[ChatMessage("assistant", ["Response"])]), + ChatResponse(messages=[Message("assistant", ["Response"])]), ] - agent = ChatAgent(chat_client=chat_client, name="test_agent", middleware=[capture_middleware]) + agent = Agent(chat_client=chat_client, name="test_agent", middleware=[capture_middleware]) await agent.run("Test message") assert captured_context is not None @@ -143,10 +143,10 @@ async def capture_middleware( await next_handler(context) chat_client.responses = [ - ChatResponse(messages=[ChatMessage("assistant", ["Streaming response"])]), + ChatResponse(messages=[Message("assistant", ["Streaming response"])]), ] - agent = ChatAgent(chat_client=chat_client, name="test_agent", middleware=[capture_middleware]) + agent = Agent(chat_client=chat_client, name="test_agent", middleware=[capture_middleware]) # Run with streaming and consume the response async for _update in agent.run("Test message", stream=True): From 08b1ca024697a9c0d27f97ea6f2ce090201a375c Mon Sep 17 00:00:00 2001 From: Giles Odigwe Date: Thu, 12 Feb 2026 07:28:19 -0800 Subject: [PATCH 4/4] fixed tests --- .../core/tests/core/test_agent_context.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/python/packages/core/tests/core/test_agent_context.py b/python/packages/core/tests/core/test_agent_context.py index 7fa7a46c4d..212d4197af 100644 --- a/python/packages/core/tests/core/test_agent_context.py +++ b/python/packages/core/tests/core/test_agent_context.py @@ -107,46 +107,44 @@ async def task2() -> None: assert results["task1"] is context1 assert results["task2"] is context2 - async def test_context_available_in_middleware(self, chat_client: MockChatClient) -> None: + async def test_context_available_in_middleware(self) -> None: """Test that agent run context is available in agent middleware.""" + chat_client = MockChatClient() captured_context: AgentContext | None = None @agent_middleware - async def capture_middleware( - context: AgentContext, next_handler: Callable[[AgentContext], Awaitable[None]] - ) -> None: + async def capture_middleware(context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None: nonlocal captured_context # Get ambient context - should be the same as the passed context captured_context = get_current_agent_run_context() - await next_handler(context) + await call_next() chat_client.responses = [ ChatResponse(messages=[Message("assistant", ["Response"])]), ] - agent = Agent(chat_client=chat_client, name="test_agent", middleware=[capture_middleware]) + agent = Agent(client=chat_client, name="test_agent", middleware=[capture_middleware]) await agent.run("Test message") assert captured_context is not None assert captured_context.agent is agent - async def test_context_available_in_streaming_middleware(self, chat_client: MockChatClient) -> None: + async def test_context_available_in_streaming_middleware(self) -> None: """Test that agent run context is available in middleware during streaming.""" + chat_client = MockChatClient() captured_context: AgentContext | None = None @agent_middleware - async def capture_middleware( - context: AgentContext, next_handler: Callable[[AgentContext], Awaitable[None]] - ) -> None: + async def capture_middleware(context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None: nonlocal captured_context captured_context = get_current_agent_run_context() - await next_handler(context) + await call_next() chat_client.responses = [ ChatResponse(messages=[Message("assistant", ["Streaming response"])]), ] - agent = Agent(chat_client=chat_client, name="test_agent", middleware=[capture_middleware]) + agent = Agent(client=chat_client, name="test_agent", middleware=[capture_middleware]) # Run with streaming and consume the response async for _update in agent.run("Test message", stream=True):