Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions python/packages/core/agent_framework/_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,20 @@ async def _get_stream() -> ResponseStream[ChatResponseUpdate, ChatResponse]:
**ctx["filtered_kwargs"],
)

def _propagate_conversation_id(update: AgentResponseUpdate) -> AgentResponseUpdate:
"""Eagerly propagate conversation_id to session as updates arrive.

This ensures session.service_session_id is set even when the user
only iterates the stream without calling get_final_response().
"""
if session is None:
return update
raw = update.raw_representation
conv_id = getattr(raw, "conversation_id", None) if raw else None
if isinstance(conv_id, str) and conv_id and session.service_session_id != conv_id:
session.service_session_id = conv_id
return update

return (
ResponseStream
.from_awaitable(_get_stream())
Expand All @@ -933,6 +947,7 @@ async def _get_stream() -> ResponseStream[ChatResponseUpdate, ChatResponse]:
self._finalize_response_updates, response_format=options.get("response_format") if options else None
),
)
.with_transform_hook(_propagate_conversation_id)
.with_result_hook(_post_hook)
)

Expand Down
29 changes: 26 additions & 3 deletions python/packages/core/agent_framework/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1659,17 +1659,34 @@ def _extract_tools(
return None


def _is_hosted_tool_approval(content: Any) -> bool:
"""Check if a function_approval_request/response is for a hosted tool (e.g. MCP).

Hosted tool approvals have a server_label in function_call.additional_properties
and should be passed through to the API untouched rather than processed locally.
"""
fc = getattr(content, "function_call", None)
if fc is None:
return False
ap = getattr(fc, "additional_properties", None)
return bool(ap and ap.get("server_label"))


def _collect_approval_responses(
messages: list[Message],
) -> dict[str, Content]:
"""Collect approval responses (both approved and rejected) from messages."""
"""Collect approval responses (both approved and rejected) from messages.

Hosted tool approvals (e.g. MCP) are excluded because they must be
forwarded to the API as-is rather than processed locally.
"""
from ._types import Message

fcc_todo: dict[str, Content] = {}
for msg in messages:
for content in msg.contents if isinstance(msg, Message) else []:
# Collect BOTH approved and rejected responses
if content.type == "function_approval_response":
# Collect BOTH approved and rejected responses, but skip hosted tool approvals
if content.type == "function_approval_response" and not _is_hosted_tool_approval(content):
fcc_todo[content.id] = content # type: ignore[attr-defined, index]
return fcc_todo

Expand Down Expand Up @@ -1698,6 +1715,9 @@ def _replace_approval_contents_with_results(

for content_idx, content in enumerate(msg.contents):
if content.type == "function_approval_request":
# Skip hosted tool approvals — they must pass through to the API unchanged
if _is_hosted_tool_approval(content):
continue
# Don't add the function call if it already exists (would create duplicate)
if content.function_call.call_id in existing_call_ids: # type: ignore[attr-defined, union-attr, operator]
# Just mark for removal - the function call already exists
Expand All @@ -1706,6 +1726,9 @@ def _replace_approval_contents_with_results(
# Put back the function call content only if it doesn't exist
msg.contents[content_idx] = content.function_call # type: ignore[attr-defined, assignment]
elif content.type == "function_approval_response":
# Skip hosted tool approvals — they must pass through to the API unchanged
if _is_hosted_tool_approval(content):
continue
if content.approved and content.id in fcc_todo: # type: ignore[attr-defined]
# Replace with the corresponding result
if result_idx < len(approved_function_results):
Expand Down
39 changes: 39 additions & 0 deletions python/packages/core/tests/core/test_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,45 @@ async def test_chat_client_agent_update_session_id_streaming_does_not_use_respon
assert session.service_session_id is None


async def test_chat_client_agent_streaming_session_id_set_without_get_final_response(
chat_client_base: SupportsChatGetResponse,
) -> None:
"""Test that session.service_session_id is set during streaming iteration.

This verifies the eager propagation of conversation_id via transform hook,
which is needed for multi-turn flows (e.g. hosted MCP approval) where the
user iterates the stream and then makes a follow-up call without calling
get_final_response().
"""
chat_client_base.streaming_responses = [
[
ChatResponseUpdate(
contents=[Content.from_text("part 1")],
role="assistant",
response_id="resp_123",
conversation_id="resp_123",
),
ChatResponseUpdate(
contents=[Content.from_text(" part 2")],
role="assistant",
response_id="resp_123",
conversation_id="resp_123",
finish_reason="stop",
),
]
]

agent = Agent(client=chat_client_base)
session = agent.create_session()
assert session.service_session_id is None

# Only iterate — do NOT call get_final_response()
async for _ in agent.run("Hello", session=session, stream=True):
pass

assert session.service_session_id == "resp_123"


async def test_chat_client_agent_update_session_messages(client: SupportsChatGetResponse) -> None:
from agent_framework._sessions import InMemoryHistoryProvider

Expand Down
146 changes: 146 additions & 0 deletions python/packages/core/tests/core/test_function_invocation_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,152 @@ def local_func(arg1: str) -> str:
assert response is not None


async def test_hosted_mcp_approval_response_passthrough(chat_client_base: SupportsChatGetResponse):
"""Test that hosted MCP approval responses pass through without local execution.

When an MCP approval response has server_label in function_call.additional_properties,
the function invocation layer must not intercept it. The approval request/response
should be forwarded to the API as-is so the service can execute the hosted tool.
"""

@tool(name="local_function")
def local_func(arg1: str) -> str:
return f"Local {arg1}"

# Simulate an MCP approval request from the service (has server_label)
mcp_function_call = Content.from_function_call(
call_id="mcpr_abc123",
name="microsoft_docs_search",
arguments='{"query": "azure storage"}',
additional_properties={"server_label": "Microsoft_Learn_MCP"},
)
mcp_approval_request = Content.from_function_approval_request(
id="mcpr_abc123",
function_call=mcp_function_call,
)
mcp_approval_response = mcp_approval_request.to_function_approval_response(approved=True)

# The second call (after approval) should return a final response
chat_client_base.run_responses = [
ChatResponse(messages=Message(role="assistant", text="Here are the docs results.")),
]

# Build message list mimicking handle_approvals_without_session:
# [original query, assistant with approval_request, user with approval_response]
messages = [
Message(role="user", text="Search docs for azure storage"),
Message(role="assistant", contents=[mcp_approval_request]),
Message(role="user", contents=[mcp_approval_response]),
]

response = await chat_client_base.get_response(
messages,
tool_choice="auto",
tools=[local_func],
)

# The response should succeed without errors
assert response is not None
assert response.messages[0].text == "Here are the docs results."

# The approval contents should NOT have been mutated by the function invocation layer.
# The assistant message should still have the original approval_request content.
assistant_msg = messages[1]
assert assistant_msg.contents[0].type == "function_approval_request"
# The user message should still have the original approval_response content.
user_msg = messages[2]
assert user_msg.contents[0].type == "function_approval_response"


def test_is_hosted_tool_approval_with_server_label():
"""Test that _is_hosted_tool_approval returns True for MCP approvals with server_label."""
from agent_framework._tools import _is_hosted_tool_approval

mcp_fc = Content.from_function_call(
call_id="mcpr_abc",
name="docs_search",
arguments="{}",
additional_properties={"server_label": "Microsoft_Learn_MCP"},
)
mcp_request = Content.from_function_approval_request(id="mcpr_abc", function_call=mcp_fc)
mcp_response = mcp_request.to_function_approval_response(approved=True)

assert _is_hosted_tool_approval(mcp_request) is True
assert _is_hosted_tool_approval(mcp_response) is True


def test_is_hosted_tool_approval_without_server_label():
"""Test that _is_hosted_tool_approval returns False for regular tool approvals."""
from agent_framework._tools import _is_hosted_tool_approval

regular_fc = Content.from_function_call(call_id="call_1", name="my_func", arguments="{}")
regular_request = Content.from_function_approval_request(id="call_1", function_call=regular_fc)
regular_response = regular_request.to_function_approval_response(approved=True)

assert _is_hosted_tool_approval(regular_request) is False
assert _is_hosted_tool_approval(regular_response) is False
# Also test with None/non-content objects
assert _is_hosted_tool_approval(None) is False
assert _is_hosted_tool_approval("not a content") is False


async def test_mixed_local_and_hosted_approval_flow(chat_client_base: SupportsChatGetResponse):
"""Test that mixed local + hosted MCP approvals are handled correctly.

When a response contains both a local tool approval and a hosted MCP approval,
the local approval should be processed normally while the hosted MCP approval
should pass through untouched to the API.
"""

@tool(name="local_function", approval_mode="always_require")
def local_func(arg1: str) -> str:
return f"Local {arg1}"

# Simulate the LLM returning both a local function call and an MCP approval request
local_fc = Content.from_function_call(call_id="call_local", name="local_function", arguments='{"arg1": "test"}')
mcp_fc = Content.from_function_call(
call_id="mcpr_hosted",
name="microsoft_docs_search",
arguments='{"query": "azure"}',
additional_properties={"server_label": "Microsoft_Learn_MCP"},
)
mcp_approval_request = Content.from_function_approval_request(id="mcpr_hosted", function_call=mcp_fc)

# First response: LLM returns a local function call that needs approval
chat_client_base.run_responses = [
ChatResponse(messages=Message(role="assistant", contents=[local_fc])),
# After local approval + hosted approval, the final response
ChatResponse(messages=Message(role="assistant", text="Done with both tools.")),
]

# User approves the local function call
local_approval_response = Content.from_function_approval_response(
approved=True, id="call_local", function_call=local_fc
)
# User also has an MCP approval response (hosted)
mcp_approval_response = mcp_approval_request.to_function_approval_response(approved=True)

messages = [
Message(role="user", text="Search docs and run local"),
Message(role="assistant", contents=[local_fc, mcp_approval_request]),
Message(role="user", contents=[local_approval_response]),
Message(role="user", contents=[mcp_approval_response]),
]

response = await chat_client_base.get_response(
messages,
tool_choice="auto",
tools=[local_func],
)

assert response is not None
# The hosted MCP approval contents should NOT have been mutated
assistant_msg = messages[1]
assert assistant_msg.contents[1].type == "function_approval_request"
mcp_user_msg = messages[3]
assert mcp_user_msg.contents[0].type == "function_approval_response"


async def test_unapproved_tool_execution_raises_exception(chat_client_base: SupportsChatGetResponse):
"""Test that attempting to execute an unapproved tool raises ToolException."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@ async def handle_approvals_with_session_streaming(query: str, agent: "SupportsAg
"""Here we let the session deal with the previous responses, and we just rerun with the approval."""
from agent_framework import Message

new_input: list[Message] = []
new_input: list[Message | str] = [query]
new_input_added = True
while new_input_added:
new_input_added = False
new_input.append(Message(role="user", text=query))
async for update in agent.run(new_input, session=session, options={"store": True}, stream=True):
if update.user_input_requests:
# Reset input to only contain new approval responses for the next iteration
new_input = []
for user_input_needed in update.user_input_requests:
print(
f"User Input Request for function from {agent.name}: {user_input_needed.function_call.name}"
Expand Down Expand Up @@ -114,7 +115,8 @@ async def run_hosted_mcp_without_session_and_specific_approval() -> None:
async with Agent(
client=client,
name="DocsAgent",
instructions="You are a helpful assistant that can help with microsoft documentation questions.",
instructions="You are a helpful assistant that uses your MCP tool "
"to help with microsoft documentation questions.",
tools=[mcp_tool],
) as agent:
# First query
Expand Down Expand Up @@ -151,7 +153,8 @@ async def run_hosted_mcp_without_approval() -> None:
async with Agent(
client=client,
name="DocsAgent",
instructions="You are a helpful assistant that can help with microsoft documentation questions.",
instructions="You are a helpful assistant that uses your MCP tool "
"to help with Microsoft documentation questions.",
tools=[mcp_tool],
) as agent:
# First query
Expand Down Expand Up @@ -186,7 +189,8 @@ async def run_hosted_mcp_with_session() -> None:
async with Agent(
client=client,
name="DocsAgent",
instructions="You are a helpful assistant that can help with microsoft documentation questions.",
instructions="You are a helpful assistant that uses your MCP tool "
"to help with microsoft documentation questions.",
tools=[mcp_tool],
) as agent:
# First query
Expand Down Expand Up @@ -222,7 +226,8 @@ async def run_hosted_mcp_with_session_streaming() -> None:
async with Agent(
client=client,
name="DocsAgent",
instructions="You are a helpful assistant that can help with microsoft documentation questions.",
instructions="You are a helpful assistant that uses your MCP tool "
"to help with microsoft documentation questions.",
tools=[mcp_tool],
) as agent:
# First query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

from agent_framework import Agent
from agent_framework.openai import OpenAIResponsesClient
from dotenv import load_dotenv

"""
OpenAI Responses Client with Hosted MCP Example

This sample demonstrates integrating hosted Model Context Protocol (MCP) tools with
OpenAI Responses Client, including user approval workflows for function call security.
"""

load_dotenv() # Load environment variables from .env file if present
if TYPE_CHECKING:
from agent_framework import AgentSession, SupportsAgentRun

Expand Down Expand Up @@ -69,13 +70,14 @@ async def handle_approvals_with_session_streaming(query: str, agent: "SupportsAg
"""Here we let the session deal with the previous responses, and we just rerun with the approval."""
from agent_framework import Message

new_input: list[Message] = []
new_input: list[Message | str] = [query]
new_input_added = True
while new_input_added:
new_input_added = False
new_input.append(Message(role="user", text=query))
async for update in agent.run(new_input, session=session, stream=True, options={"store": True}):
if update.user_input_requests:
# Reset input to only contain new approval responses for the next iteration
new_input = []
for user_input_needed in update.user_input_requests:
print(
f"User Input Request for function from {agent.name}: {user_input_needed.function_call.name}"
Expand Down