Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/claude_agent_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from .client import ClaudeSDKClient
from .query import query
from .types import (
TERMINAL_TASK_STATUSES,
AgentDefinition,
AssistantMessage,
BaseHookInput,
Expand Down Expand Up @@ -131,6 +132,8 @@
TaskNotificationStatus,
TaskProgressMessage,
TaskStartedMessage,
TaskUpdatedMessage,
TaskUpdatedStatus,
TaskUsage,
TextBlock,
ThinkingBlock,
Expand Down Expand Up @@ -544,8 +547,11 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any:
"SystemMessage",
"TaskStartedMessage",
"TaskProgressMessage",
"TaskUpdatedMessage",
"TaskNotificationMessage",
"TaskNotificationStatus",
"TaskUpdatedStatus",
"TERMINAL_TASK_STATUSES",
"TaskUsage",
"ResultMessage",
"DeferredToolUse",
Expand Down
25 changes: 25 additions & 0 deletions src/claude_agent_sdk/_internal/message_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TaskNotificationMessage,
TaskProgressMessage,
TaskStartedMessage,
TaskUpdatedMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
Expand Down Expand Up @@ -225,6 +226,30 @@ def parse_message(data: dict[str, Any]) -> Message | None:
tool_use_id=data.get("tool_use_id"),
usage=data.get("usage"),
)
case "task_updated":
# Terminal task completion sometimes arrives only as a
# task_updated patch (no separate task_notification), so
# expose it as a typed lifecycle message rather than a
# generic SystemMessage. Parsed defensively: the patch
# may omit uuid/session_id and parsing must never raise
# on a lifecycle event.
patch = data.get("patch")
if not isinstance(patch, dict):
patch = {}
# Terminal-ness is derived from patch.status; the CLI is
# assumed to set it on terminal transitions. A patch that
# carries only end_time/result/error (no status) is left
# non-terminal (status=None) — the full patch is still
# preserved on .patch for callers that need more.
return TaskUpdatedMessage(
subtype=subtype,
data=data,
task_id=data.get("task_id", ""),
patch=patch,
status=patch.get("status"),
session_id=data.get("session_id"),
uuid=data.get("uuid"),
)
case "mirror_error":
# SDK-synthesized via report_mirror_error — never emitted by the CLI subprocess.
return MirrorErrorMessage(
Expand Down
50 changes: 50 additions & 0 deletions src/claude_agent_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,20 @@ class TaskUsage(TypedDict):
TaskNotificationStatus = Literal["completed", "failed", "stopped"]


# Possible status values reported inside a ``task_updated`` patch. "running"
# means the task is still in progress; the remaining values are terminal.
TaskUpdatedStatus = Literal["running", "completed", "failed", "stopped", "cancelled"]


# Task statuses that mean the task has finished and should be cleared from any
# "active task" tracking. Terminal task state can arrive through either a
# ``TaskNotificationMessage`` or a ``TaskUpdatedMessage``, so consumers should
# treat the ``status`` field of both the same way.
TERMINAL_TASK_STATUSES: frozenset[str] = frozenset(
{"completed", "failed", "stopped", "cancelled"}
)


@dataclass
class TaskStartedMessage(SystemMessage):
"""System message emitted when a task starts.
Expand Down Expand Up @@ -1095,6 +1109,12 @@ class TaskProgressMessage(SystemMessage):
class TaskNotificationMessage(SystemMessage):
"""System message emitted when a task completes, fails, or is stopped.

Note: not every terminal task emits this message. Background tasks may
instead report completion via a :class:`TaskUpdatedMessage` whose
``patch.status`` is terminal (see ``TERMINAL_TASK_STATUSES``). Consumers
tracking active task IDs should clear them on a terminal status from
*either* message — see :class:`TaskUpdatedMessage` for the full contract.

Subclass of SystemMessage: existing ``isinstance(msg, SystemMessage)`` and
``case SystemMessage()`` checks continue to match. The base ``subtype``
and ``data`` fields remain populated with the raw payload.
Expand All @@ -1110,6 +1130,36 @@ class TaskNotificationMessage(SystemMessage):
usage: TaskUsage | None = None


@dataclass
class TaskUpdatedMessage(SystemMessage):
"""System message emitted when a background task's state changes.

The CLI emits ``system``/``task_updated`` events as a task moves through its
lifecycle. ``patch`` carries the changed fields (e.g. ``status``,
``end_time``); when ``patch.status`` is terminal (see
``TERMINAL_TASK_STATUSES``) the task has finished — even when no separate
``TaskNotificationMessage`` is emitted for it.

Lifecycle contract: every task that emits a ``TaskStartedMessage`` also
emits a typed terminal event for the same ``task_id`` — either a
``TaskNotificationMessage`` (``status`` in completed/failed/stopped) or a
``TaskUpdatedMessage`` whose ``status`` is terminal. Consumers that track
active task IDs should clear them when they see a terminal status from
*either* message, regardless of whether completion happened via automatic
notification, ``TaskOutput``, or another background-task path.

Subclass of SystemMessage: existing ``isinstance(msg, SystemMessage)`` and
``case SystemMessage()`` checks continue to match. The base ``subtype`` and
``data`` fields remain populated with the raw payload.
"""

task_id: str
patch: dict[str, Any]
status: TaskUpdatedStatus | None = None
session_id: str | None = None
uuid: str | None = None


@dataclass
class MirrorErrorMessage(SystemMessage):
"""System message emitted when a :meth:`SessionStore.append` call fails.
Expand Down
130 changes: 130 additions & 0 deletions tests/test_message_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from claude_agent_sdk._errors import MessageParseError
from claude_agent_sdk._internal.message_parser import parse_message
from claude_agent_sdk.types import (
TERMINAL_TASK_STATUSES,
AssistantMessage,
DeferredToolUse,
HookEventMessage,
Expand All @@ -16,6 +17,7 @@
TaskNotificationMessage,
TaskProgressMessage,
TaskStartedMessage,
TaskUpdatedMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
Expand Down Expand Up @@ -533,6 +535,133 @@ def test_parse_task_notification_message_optional_fields_absent(self):
assert message.usage is None
assert message.tool_use_id is None

def test_parse_task_updated_message_terminal(self):
"""task_updated with a terminal patch.status yields a TaskUpdatedMessage."""
data = {
"type": "system",
"subtype": "task_updated",
"task_id": "task-abc",
"patch": {"status": "completed", "end_time": 1780405729183},
"uuid": "uuid-4",
"session_id": "session-1",
}
message = parse_message(data)
assert isinstance(message, TaskUpdatedMessage)
assert message.task_id == "task-abc"
assert message.patch == {"status": "completed", "end_time": 1780405729183}
assert message.status == "completed"
assert message.uuid == "uuid-4"
assert message.session_id == "session-1"
assert message.status in TERMINAL_TASK_STATUSES

def test_parse_task_updated_message_minimal(self):
"""task_updated with only task_id and patch (no uuid/session_id) still parses.

Mirrors the observed CLI shape where terminal completion arrives as a
bare task_updated patch — parsing must never raise on a lifecycle event.
"""
data = {
"type": "system",
"subtype": "task_updated",
"task_id": "b1m21w89v",
"patch": {"status": "completed", "end_time": 1780405729183},
}
message = parse_message(data)
assert isinstance(message, TaskUpdatedMessage)
assert message.task_id == "b1m21w89v"
assert message.status == "completed"
assert message.uuid is None
assert message.session_id is None

def test_parse_task_updated_message_running_not_terminal(self):
"""A non-terminal (running) task_updated parses; status is not terminal."""
data = {
"type": "system",
"subtype": "task_updated",
"task_id": "task-abc",
"patch": {"status": "running"},
}
message = parse_message(data)
assert isinstance(message, TaskUpdatedMessage)
assert message.status == "running"
assert message.status not in TERMINAL_TASK_STATUSES

def test_parse_task_updated_message_no_patch(self):
"""task_updated with no patch parses with an empty patch and status None."""
data = {
"type": "system",
"subtype": "task_updated",
"task_id": "task-abc",
}
message = parse_message(data)
assert isinstance(message, TaskUpdatedMessage)
assert message.patch == {}
assert message.status is None

def test_parse_task_updated_message_patch_without_status(self):
"""A patch lacking 'status' is preserved verbatim; status is None."""
data = {
"type": "system",
"subtype": "task_updated",
"task_id": "task-abc",
"patch": {"end_time": 1780405729183},
}
message = parse_message(data)
assert isinstance(message, TaskUpdatedMessage)
assert message.patch == {"end_time": 1780405729183}
assert message.status is None

@pytest.mark.parametrize("patch", ["completed", ["completed"], 42, None])
def test_parse_task_updated_message_non_dict_patch(self, patch):
"""A non-dict (or missing) patch never raises; patch falls back to {}."""
data = {
"type": "system",
"subtype": "task_updated",
"task_id": "task-abc",
"patch": patch,
}
message = parse_message(data)
assert isinstance(message, TaskUpdatedMessage)
assert message.patch == {}
assert message.status is None

@pytest.mark.parametrize("status", ["completed", "failed", "stopped", "cancelled"])
def test_parse_task_updated_message_terminal_statuses(self, status):
"""Every terminal patch.status is surfaced and recognized as terminal."""
data = {
"type": "system",
"subtype": "task_updated",
"task_id": "task-abc",
"patch": {"status": status},
}
message = parse_message(data)
assert isinstance(message, TaskUpdatedMessage)
assert message.status == status
assert message.status in TERMINAL_TASK_STATUSES

def test_task_updated_backward_compat_isinstance(self):
"""Backward-compat: TaskUpdatedMessage is still a SystemMessage."""
data = {
"type": "system",
"subtype": "task_updated",
"task_id": "t1",
"patch": {"status": "failed"},
"uuid": "u1",
"session_id": "s1",
}
message = parse_message(data)
assert isinstance(message, TaskUpdatedMessage)
assert isinstance(message, SystemMessage)
# Base class fields still populated for legacy code paths.
assert message.subtype == "task_updated"
assert message.data == data
# match-case against SystemMessage still works.
matched = False
match message:
case SystemMessage():
matched = True
assert matched

def test_task_message_backward_compat_isinstance(self):
"""Backward-compat: typed task messages are still SystemMessage instances."""
started_data = {
Expand Down Expand Up @@ -603,6 +732,7 @@ def test_unknown_system_subtype_yields_generic(self):
assert not isinstance(message, TaskStartedMessage)
assert not isinstance(message, TaskProgressMessage)
assert not isinstance(message, TaskNotificationMessage)
assert not isinstance(message, TaskUpdatedMessage)
assert message.subtype == "some_future_subtype"
assert message.data == data

Expand Down