Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- **agent-framework-core**: Add long-running agents and background responses support with `ContinuationToken` TypedDict, `background` option in `OpenAIResponsesOptions`, and continuation token propagation through response types ([#2478](https://github.com/microsoft/agent-framework/issues/2478))

## [1.0.0b260130] - 2026-01-30

### Added
Expand Down
3 changes: 2 additions & 1 deletion python/packages/a2a/agent_framework_a2a/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import importlib.metadata

from ._agent import A2AAgent
from ._agent import A2AAgent, A2AContinuationToken

try:
__version__ = importlib.metadata.version(__name__)
Expand All @@ -11,5 +11,6 @@

__all__ = [
"A2AAgent",
"A2AContinuationToken",
"__version__",
]
202 changes: 126 additions & 76 deletions python/packages/a2a/agent_framework_a2a/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
FileWithUri,
Message,
Task,
TaskIdParams,
TaskQueryParams,
TaskState,
TextPart,
TransportProtocol,
Expand All @@ -34,21 +36,39 @@
BaseAgent,
ChatMessage,
Content,
ContinuationToken,
ResponseStream,
normalize_messages,
prepend_agent_framework_to_user_agent,
)
from agent_framework.observability import AgentTelemetryLayer

__all__ = ["A2AAgent"]
__all__ = ["A2AAgent", "A2AContinuationToken"]

URI_PATTERN = re.compile(r"^data:(?P<media_type>[^;]+);base64,(?P<base64_data>[A-Za-z0-9+/=]+)$")


class A2AContinuationToken(ContinuationToken):
"""Continuation token for A2A protocol long-running tasks."""

task_id: str
"""A2A protocol task ID."""
context_id: str
"""A2A protocol context ID."""


TERMINAL_TASK_STATES = [
TaskState.completed,
TaskState.failed,
TaskState.canceled,
TaskState.rejected,
]
IN_PROGRESS_TASK_STATES = [
TaskState.submitted,
TaskState.working,
TaskState.input_required,
TaskState.auth_required,
]


def _get_uri_data(uri: str) -> str:
Expand Down Expand Up @@ -193,6 +213,8 @@ def run(
*,
stream: Literal[False] = ...,
thread: AgentThread | None = None,
continuation_token: A2AContinuationToken | None = None,
background: bool = False,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]]: ...

Expand All @@ -203,6 +225,8 @@ def run(
*,
stream: Literal[True],
thread: AgentThread | None = None,
continuation_token: A2AContinuationToken | None = None,
background: bool = False,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...

Expand All @@ -212,85 +236,62 @@ def run(
*,
stream: bool = False,
thread: AgentThread | None = None,
continuation_token: A2AContinuationToken | None = None,
background: bool = False,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
"""Get a response from the agent.

This method returns the final result of the agent's execution
as a single AgentResponse object when stream=False. When stream=True,
it returns a ResponseStream that yields AgentResponseUpdate objects.

Args:
messages: The message(s) to send to the agent.

Keyword Args:
stream: Whether to stream the response. Defaults to False.
thread: The conversation thread associated with the message(s).
continuation_token: Optional token to resume a long-running task
instead of starting a new one.
background: When True, in-progress task updates surface continuation
tokens so the caller can poll or resubscribe later. When False
(default), the agent internally waits for the task to complete.
kwargs: Additional keyword arguments.

Returns:
When stream=False: An Awaitable[AgentResponse].
When stream=True: A ResponseStream of AgentResponseUpdate items.
"""
if continuation_token is not None:
a2a_stream: AsyncIterable[Any] = self.client.resubscribe(TaskIdParams(id=continuation_token["task_id"]))
else:
normalized_messages = normalize_messages(messages)
a2a_message = self._prepare_message_for_a2a(normalized_messages[-1])
a2a_stream = self.client.send_message(a2a_message)

response = ResponseStream(
self._map_a2a_stream(a2a_stream, background=background),
finalizer=lambda updates: AgentResponse.from_updates(list(updates)),
)
if stream:
return self._run_stream_impl(messages=messages, thread=thread, **kwargs)
return self._run_impl(messages=messages, thread=thread, **kwargs)

async def _run_impl(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponse[Any]:
"""Non-streaming implementation of run."""
# Collect all updates and use framework to consolidate updates into response
updates: list[AgentResponseUpdate] = []
async for update in self._stream_updates(messages, thread=thread, **kwargs):
updates.append(update)
return AgentResponse.from_updates(updates)

def _run_stream_impl(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
"""Streaming implementation of run."""

def _finalize(updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]:
return AgentResponse.from_updates(list(updates))

return ResponseStream(self._stream_updates(messages, thread=thread, **kwargs), finalizer=_finalize)
return response
return response.get_final_response()

async def _stream_updates(
async def _map_a2a_stream(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
a2a_stream: AsyncIterable[Any],
*,
thread: AgentThread | None = None,
**kwargs: Any,
background: bool = False,
) -> AsyncIterable[AgentResponseUpdate]:
"""Internal method to stream updates from the A2A agent.
"""Map raw A2A protocol items to AgentResponseUpdates.

Args:
messages: The message(s) to send to the agent.
a2a_stream: The raw A2A event stream.

Keyword Args:
thread: The conversation thread associated with the message(s).
kwargs: Additional keyword arguments.

Yields:
AgentResponseUpdate items from the A2A agent.
background: When False, in-progress task updates are silently
consumed (the stream keeps iterating until a terminal state).
When True, they are yielded with a continuation token.
"""
normalized_messages = normalize_messages(messages)
a2a_message = self._prepare_message_for_a2a(normalized_messages[-1])

response_stream = self.client.send_message(a2a_message)

async for item in response_stream:
async for item in a2a_stream:
if isinstance(item, Message):
# Process A2A Message
contents = self._parse_contents_from_a2a(item.parts)
yield AgentResponseUpdate(
contents=contents,
Expand All @@ -300,33 +301,82 @@ async def _stream_updates(
)
elif isinstance(item, tuple) and len(item) == 2: # ClientEvent = (Task, UpdateEvent)
task, _update_event = item
if isinstance(task, Task) and task.status.state in TERMINAL_TASK_STATES:
# Convert Task artifacts to ChatMessages and yield as separate updates
task_messages = self._parse_messages_from_task(task)
if task_messages:
for message in task_messages:
# Use the artifact's ID from raw_representation as message_id for unique identification
artifact_id = getattr(message.raw_representation, "artifact_id", None)
yield AgentResponseUpdate(
contents=message.contents,
role=message.role,
response_id=task.id,
message_id=artifact_id,
raw_representation=task,
)
else:
# Empty task
yield AgentResponseUpdate(
contents=[],
role="assistant",
response_id=task.id,
raw_representation=task,
)
if isinstance(task, Task):
for update in self._updates_from_task(task, background=background):
yield update
else:
# Unknown response type
msg = f"Only Message and Task responses are supported from A2A agents. Received: {type(item)}"
raise NotImplementedError(msg)

# ------------------------------------------------------------------
# Task helpers
# ------------------------------------------------------------------

def _updates_from_task(self, task: Task, *, background: bool = False) -> list[AgentResponseUpdate]:
"""Convert an A2A Task into AgentResponseUpdate(s).

Terminal tasks produce updates from their artifacts/history.
In-progress tasks produce a continuation token update only when
``background=True``; otherwise they are silently skipped so the
caller keeps consuming the stream until completion.
"""
if task.status.state in TERMINAL_TASK_STATES:
task_messages = self._parse_messages_from_task(task)
if task_messages:
return [
AgentResponseUpdate(
contents=message.contents,
role=message.role,
response_id=task.id,
message_id=getattr(message.raw_representation, "artifact_id", None),
raw_representation=task,
)
for message in task_messages
]
return [AgentResponseUpdate(contents=[], role="assistant", response_id=task.id, raw_representation=task)]

if background and task.status.state in IN_PROGRESS_TASK_STATES:
token = self._build_continuation_token(task)
return [
AgentResponseUpdate(
contents=[],
role="assistant",
response_id=task.id,
continuation_token=token,
raw_representation=task,
)
]

return []

@staticmethod
def _build_continuation_token(task: Task) -> A2AContinuationToken | None:
"""Build an A2AContinuationToken from an A2A Task if it is still in progress."""
if task.status.state in IN_PROGRESS_TASK_STATES:
return A2AContinuationToken(task_id=task.id, context_id=task.context_id)
return None

async def poll_task(self, continuation_token: A2AContinuationToken) -> AgentResponse[Any]:
"""Poll for the current state of a long-running A2A task.

Unlike ``run(continuation_token=...)``, which resubscribes to the SSE
stream, this performs a single request to retrieve the task state.

Args:
continuation_token: A token previously obtained from a response's
``continuation_token`` field.

Returns:
An AgentResponse whose ``continuation_token`` is set when the task
is still in progress, or ``None`` when it has reached a terminal state.
"""
task_id = continuation_token["task_id"]
task = await self.client.get_task(TaskQueryParams(id=task_id))
updates = self._updates_from_task(task, background=True)
if updates:
return AgentResponse.from_updates(updates)
return AgentResponse(messages=[], response_id=task.id, raw_representation=task)

def _prepare_message_for_a2a(self, message: ChatMessage) -> A2AMessage:
"""Prepare a ChatMessage for the A2A protocol.

Expand Down
Loading
Loading