diff --git a/astrbot/core/agent/runners/tool_loop_agent_runner.py b/astrbot/core/agent/runners/tool_loop_agent_runner.py index 3fb487cbe6..bd921de165 100644 --- a/astrbot/core/agent/runners/tool_loop_agent_runner.py +++ b/astrbot/core/agent/runners/tool_loop_agent_runner.py @@ -731,15 +731,47 @@ def _append_tool_call_result(tool_call_id: str, content: str) -> None: if not req.func_tool: return - if ( - self.tool_schema_mode == "skills_like" - and self._skill_like_raw_tool_set - ): - # in 'skills_like' mode, raw.func_tool is light schema, does not have handler - # so we need to get the tool from the raw tool set - func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name) - else: - func_tool = req.func_tool.get_tool(func_tool_name) + # First check if it's a dynamically created subagent tool + func_tool = None + run_context_context = getattr(self.run_context, "context", None) + if run_context_context is not None: + event = getattr(run_context_context, "event", None) + if event is not None: + session_id = getattr( + self.run_context.context.event, "unified_msg_origin", None + ) + if session_id: + try: + from astrbot.core.dynamic_subagent_manager import ( + DynamicSubAgentManager, + ) + + dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session( + session_id + ) + for h in dynamic_handoffs: + if ( + h.name == func_tool_name + or f"transfer_to_{h.name}" == func_tool_name + ): + func_tool = h + break + except Exception: + pass + + # If not found in dynamic tools, check regular tool sets + if func_tool is None: + if ( + self.tool_schema_mode == "skills_like" + and self._skill_like_raw_tool_set + ): + # in 'skills_like' mode, raw.func_tool is light schema, does not have handler + # so we need to get the tool from the raw tool set + func_tool = self._skill_like_raw_tool_set.get_tool( + func_tool_name + ) + else: + func_tool = req.func_tool.get_tool(func_tool_name) logger.info(f"使用工具:{func_tool_name},参数:{func_tool_args}") @@ -859,9 +891,53 @@ def _append_tool_call_result(tool_call_id: str, content: str) -> None: "The tool has returned a data type that is not supported." ) if result_parts: + result_content = "\n\n".join(result_parts) + # Check for dynamic tool creation marker + if result_content.startswith("__DYNAMIC_TOOL_CREATED__:"): + parts = result_content.split(":", 3) + if len(parts) >= 4: + new_tool_name = parts[1] + new_tool_obj_name = parts[2] + logger.info( + f"[DynamicSubAgent] Tool created: {new_tool_name}" + ) + # Try to add the new tool to func_tool set + try: + from astrbot.core.dynamic_subagent_manager import ( + DynamicSubAgentManager, + ) + + session_id = getattr( + self.run_context.context.event, + "unified_msg_origin", + None, + ) + if session_id: + handoffs = DynamicSubAgentManager.get_handoff_tools_for_session( + session_id + ) + for handoff in handoffs: + if ( + handoff.name == new_tool_obj_name + or handoff.name + == new_tool_name.replace( + "transfer_to_", "" + ) + ): + if self.req.func_tool: + self.req.func_tool.add_tool( + handoff + ) + logger.info( + f"[DynamicSubAgent] Added {handoff.name} to func_tool set" + ) + except Exception as e: + logger.warning( + f"[DynamicSubAgent] Failed to add dynamic tool: {e}" + ) _append_tool_call_result( func_tool_id, - "\n\n".join(result_parts), + result_content, ) elif resp is None: diff --git a/astrbot/core/astr_agent_tool_exec.py b/astrbot/core/astr_agent_tool_exec.py index 1fb4b03368..590ee20033 100644 --- a/astrbot/core/astr_agent_tool_exec.py +++ b/astrbot/core/astr_agent_tool_exec.py @@ -1,4 +1,5 @@ import asyncio +import datetime import inspect import json import traceback @@ -233,6 +234,21 @@ def _build_handoff_toolset( toolset.add_tool(runtime_tool) elif isinstance(tool_name_or_obj, FunctionTool): toolset.add_tool(tool_name_or_obj) + + # Always add send_shared_context tool for shared context feature + try: + from astrbot.core.dynamic_subagent_manager import ( + SEND_SHARED_CONTEXT_TOOL, + DynamicSubAgentManager, + ) + + session_id = event.unified_msg_origin + session = DynamicSubAgentManager.get_session(session_id) + if session and session.shared_context_enabled: + toolset.add_tool(SEND_SHARED_CONTEXT_TOOL) + except Exception as e: + logger.debug(f"[EnhancedSubAgent] Failed to add shared context tool: {e}") + return None if toolset.empty() else toolset @classmethod @@ -291,21 +307,116 @@ async def _execute_handoff( except Exception: continue + # 获取子代理的历史上下文 + agent_name = getattr(tool.agent, "name", None) + subagent_history = [] + if agent_name: + try: + from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager + + stored_history = DynamicSubAgentManager.get_subagent_history( + umo, agent_name + ) + if stored_history: + # 将历史消息转换为 Message 对象 + for hist_msg in stored_history: + try: + if isinstance(hist_msg, dict): + subagent_history.append( + Message.model_validate(hist_msg) + ) + elif isinstance(hist_msg, Message): + subagent_history.append(hist_msg) + except Exception: + continue + if subagent_history: + logger.info( + f"[SubAgentHistory] Loaded {len(subagent_history)} history messages for {agent_name}" + ) + + except Exception as e: + logger.warning( + f"[SubAgentHistory] Failed to load history for {agent_name}: {e}" + ) + prov_settings: dict = ctx.get_config(umo=umo).get("provider_settings", {}) agent_max_step = int(prov_settings.get("max_agent_step", 30)) stream = prov_settings.get("streaming_response", False) + # 如果有历史上下文,合并到 contexts 中 + if subagent_history: + if contexts is None: + contexts = subagent_history + else: + contexts = subagent_history + contexts + + # 构建子代理的 system_prompt,添加 skills 提示词和公共上下文 + subagent_system_prompt = tool.agent.instructions or "" + if agent_name: + try: + from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager + + # 注入 skills + runtime = prov_settings.get("computer_use_runtime", "local") + skills_prompt = DynamicSubAgentManager.build_subagent_skills_prompt( + umo, agent_name, runtime + ) + if skills_prompt: + subagent_system_prompt += f"\n\n# Available Skills\n{skills_prompt}" + logger.info(f"[SubAgentSkills] Injected skills for {agent_name}") + + # 注入公共上下文 + shared_context_prompt = ( + DynamicSubAgentManager.build_shared_context_prompt(umo, agent_name) + ) + if shared_context_prompt: + subagent_system_prompt += f"\n{shared_context_prompt}" + logger.info( + f"[SubAgentSharedContext] Injected shared context for {agent_name}" + ) + + # 注入时间信息 + current_time = ( + datetime.datetime.now().astimezone().strftime("%Y-%m-%d %H:%M (%Z)") + ) + subagent_system_prompt += f"Current datetime: {current_time}" + + except Exception: + pass + llm_resp = await ctx.tool_loop_agent( event=event, chat_provider_id=prov_id, prompt=input_, image_urls=image_urls, - system_prompt=tool.agent.instructions, + system_prompt=subagent_system_prompt, tools=toolset, contexts=contexts, max_steps=agent_max_step, tool_call_timeout=run_context.tool_call_timeout, stream=stream, ) + + # 保存历史上下文 + try: + from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager + + agent_name = getattr(tool.agent, "name", None) + if agent_name: + # 构建当前对话的历史消息 + current_messages = [] + # 添加本轮用户输入 + current_messages.append({"role": "user", "content": input_}) + # 添加助手回复 + current_messages.append( + {"role": "assistant", "content": llm_resp.completion_text} + ) + if current_messages: + DynamicSubAgentManager.save_subagent_history( + umo, agent_name, current_messages + ) + except Exception: + pass # 不影响主流程 + yield mcp.types.CallToolResult( content=[mcp.types.TextContent(type="text", text=llm_resp.completion_text)] ) diff --git a/astrbot/core/astr_main_agent.py b/astrbot/core/astr_main_agent.py index 2b4a04907e..c9fe212767 100644 --- a/astrbot/core/astr_main_agent.py +++ b/astrbot/core/astr_main_agent.py @@ -143,6 +143,8 @@ class MainAgentBuildConfig: timezone: str | None = None max_quoted_fallback_images: int = 20 """Maximum number of images injected from quoted-message fallback extraction.""" + enhanced_subagent: dict = field(default_factory=dict) + """Log level for enhanced SubAgent: info or debug.""" @dataclass(slots=True) @@ -929,6 +931,98 @@ def _apply_llm_safety_mode(config: MainAgentBuildConfig, req: ProviderRequest) - ) +def _apply_enhanced_subagent_tools( + config: MainAgentBuildConfig, req: ProviderRequest, event: AstrMessageEvent +) -> None: + """Apply enhanced SubAgent tools and system prompt + + When enabled: + 1. Inject enhanced capability prompt into system prompt + 2. Register dynamic SubAgent management tools + 3. Register session's transfer_to_xxx tools + """ + if not config.enhanced_subagent.get("enabled", False): + return + + if req.func_tool is None: + req.func_tool = ToolSet() + + try: + from astrbot.core.dynamic_subagent_manager import ( + CLEANUP_DYNAMIC_SUBAGENT_TOOL, + CREATE_DYNAMIC_SUBAGENT_TOOL, + LIST_DYNAMIC_SUBAGENTS_TOOL, + PROTECT_SUBAGENT_TOOL, + SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT, + UNPROTECT_SUBAGENT_TOOL, + VIEW_SHARED_CONTEXT_TOOL, + DynamicSubAgentManager, + ) + from astrbot.core.subagent_logger import SubAgentLogger + + # Register dynamic SubAgent management tools + req.func_tool.add_tool(CREATE_DYNAMIC_SUBAGENT_TOOL) + req.func_tool.add_tool(CLEANUP_DYNAMIC_SUBAGENT_TOOL) + req.func_tool.add_tool(LIST_DYNAMIC_SUBAGENTS_TOOL) + req.func_tool.add_tool(PROTECT_SUBAGENT_TOOL) + req.func_tool.add_tool(UNPROTECT_SUBAGENT_TOOL) + req.func_tool.add_tool(VIEW_SHARED_CONTEXT_TOOL) + req.func_tool.add_tool(SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT) + + # Configure logger + SubAgentLogger.configure(level=config.enhanced_subagent.get("log_level")) + + # Configure DynamicSubAgentManager with settings + shared_context_enabled = config.enhanced_subagent.get( + "shared_context_enabled", False + ) + DynamicSubAgentManager.configure( + max_subagent_count=config.enhanced_subagent.get("max_subagent_count"), + auto_cleanup_per_turn=config.enhanced_subagent.get("auto_cleanup_per_turn"), + shared_context_enabled=shared_context_enabled, + shared_context_maxlen=config.enhanced_subagent.get( + "shared_context_maxlen", 200 + ), + ) + + # Enable shared context if configured + if shared_context_enabled: + DynamicSubAgentManager.set_shared_context_enabled( + event.unified_msg_origin, True + ) + + # Inject enhanced system prompt + dynamic_subagent_prompt = DynamicSubAgentManager.get_dynamic_subagent_prompt() + req.system_prompt = f"{req.system_prompt or ''}\n{dynamic_subagent_prompt}\n" + # Register existing handoff tools from config + plugin_context = getattr(event, "_plugin_context", None) + if plugin_context and plugin_context.subagent_orchestrator: + so = plugin_context.subagent_orchestrator + if hasattr(so, "handoffs"): + for tool in so.handoffs: + req.func_tool.add_tool(tool) + # Register dynamically created handoff tools + session_id = event.unified_msg_origin + dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session( + session_id + ) + for handoff in dynamic_handoffs: + req.func_tool.add_tool(handoff) + + # Check if we should cleanup subagents from previous turn + # This is done at the START of a new turn to clean up from previous turn + try: + DynamicSubAgentManager.cleanup_session_turn_start(session_id) + except Exception as e: + from astrbot import logger + + logger.warning(f"[DynamicSubAgent] Cleanup failed: {e}") + except ImportError as e: + from astrbot import logger + + logger.warning(f"[DynamicSubAgent] Cannot import module: {e}") + + def _apply_sandbox_tools( config: MainAgentBuildConfig, req: ProviderRequest, session_id: str ) -> None: @@ -1254,6 +1348,9 @@ async def build_main_agent( elif config.computer_use_runtime == "local": _apply_local_env_tools(req) + # Apply enhanced SubAgent tools + _apply_enhanced_subagent_tools(config, req, event) + agent_runner = AgentRunner() astr_agent_ctx = AstrAgentContext( context=plugin_context, diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index 89aad5193e..39fee8fdba 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -195,6 +195,15 @@ ), "agents": [], }, + # 增强版动态SubAgent配置(独立于subagent_orchestrator) + "enhanced_subagent": { + "enabled": False, + "log_level": "debug", + "max_subagent_count": 3, + "auto_cleanup_per_turn": True, + "shared_context_enabled": False, + "shared_context_maxlen": 200, + }, "provider_stt_settings": { "enable": False, "provider_id": "", @@ -2446,17 +2455,17 @@ class ChatProviderTemplate(TypedDict): "mimo-tts-style-prompt": { "description": "风格提示词", "type": "string", - "hint": "会以 标签形式添加到待合成文本开头,用于控制语速、情绪、角色或风格,例如 开心、变快、孙悟空、悄悄话。可留空。", + "hint": "用于控制生成语音的说话风格、语气或情绪,例如温柔、活泼、沉稳等。可留空。", }, "mimo-tts-dialect": { "description": "方言", "type": "string", - "hint": "会与风格提示词一起写入开头的 标签中,例如 东北话、四川话、河南话、粤语。可留空。", + "hint": "指定生成语音时使用的方言或口音,例如四川话、粤语口音等。可留空。", }, "mimo-tts-seed-text": { "description": "种子文本", "type": "string", - "hint": "作为可选的 user 消息发送,用于辅助调节语气和风格,不会拼接到待合成文本中。", + "hint": "用于引导音色和说话方式的参考文本,会影响生成语音的表达风格。", }, "fishaudio-tts-character": { "description": "character", diff --git a/astrbot/core/dynamic_subagent_manager.py b/astrbot/core/dynamic_subagent_manager.py new file mode 100644 index 0000000000..ddba92523b --- /dev/null +++ b/astrbot/core/dynamic_subagent_manager.py @@ -0,0 +1,916 @@ +""" +Dynamic SubAgent Manager +Manages dynamically created subagents for task decomposition and parallel processing +""" + +from __future__ import annotations + +import asyncio +import re +import time +from dataclasses import dataclass, field + +from astrbot import logger +from astrbot.core.agent.agent import Agent +from astrbot.core.agent.handoff import HandoffTool +from astrbot.core.agent.tool import FunctionTool +from astrbot.core.subagent_logger import SubAgentLogger + + +@dataclass +class DynamicSubAgentConfig: + name: str + system_prompt: str = "" + tools: list | None = None + skills: list | None = None + provider_id: str | None = None + description: str = "" + max_steps: int = 30 + begin_dialogs: list | None = None + + +@dataclass +class SubAgentExecutionResult: + agent_name: str + success: bool + result: str + error: str | None = None + execution_time: float = 0.0 + + +@dataclass +class DynamicSubAgentSession: + session_id: str + agents: dict = field(default_factory=dict) + handoff_tools: dict = field(default_factory=dict) + results: dict = field(default_factory=dict) + enable_interaction: bool = False + created_at: float = 0.0 + protected_agents: set = field( + default_factory=set + ) # 若某个agent受到保护,则不会被自动清理 + agent_histories: dict = field(default_factory=dict) # 存储每个子代理的历史上下文 + shared_context: list = field(default_factory=list) # 公共上下文列表 + shared_context_enabled: bool = False # 是否启用公共上下文 + + +class DynamicSubAgentManager: + _sessions: dict = {} + _log_level: str = "info" + _max_subagent_count: int = 3 + _auto_cleanup_per_turn: bool = True + _shared_context_enabled: bool = False + _shared_context_maxlen: int = 200 + + @classmethod + def get_dynamic_subagent_prompt(cls): + if cls._shared_context_enabled: + shared_context_prompt = """- #### Collaborative Communication Mechanism + **Communication Tool description**: Inform the sub-agent that `send_shared_context` tool can be used for public channel communication, visible to all online sub-agents and the main agent. + **Communication protocol** : Clarify when to use this tool. + Progress reporting: Status updates must be sent when a task starts, encounters a blockage, or is completed. + Resource handover: After completing the task, send the generated file path and key conclusions to the public channel for use by downstream agents.""" + else: + shared_context_prompt = "" + + return f"""# Dynamic Sub-Agent Capability + +You are the Main Agent, and have the ability to dynamically create and manage sub-agents with isolated instructions, tools and skills. + +## When to create Sub-agents: + +- The task can be explicitly decomposed +- Requires multiple professional domain +- Processing very long contexts that exceeding the limitations of a single agent +- Parallel processing + +## Primary Workflow + +1. **Global planning**: + After receiving a user request, first formulate an overall execution plan and break it down into multiple subtask steps. + + Identify the dependencies between subtasks (who comes first and who comes second, who depends on whose output, and which sub-agents can run in parallel). + +2. **Sub-Agent Designing**: + Use the `create_dynamic_subagent` tool to create multiple sub-agents, and `transfer_to_{{name}}` tools will be created, where `{{name}}` is the name of a sub-agent. + +3. **Sub-Agent Delegating** + Use the `transfer_to_{{name}}` tool to delegate sub-agent + +## Creating Sub-agents with Name, System Prompt, Tools and Skills + +When creating a sub-agent, you should name it with **letters, numbers, and underscores**, no Chinese characters, punctuation marks, emojis or other characters not allowed in computer program. + +Meanwhile, you need to assign specific **System Prompt**, **Tools** and **Skills** to it. Each sub-agent's system prompt, tools and skills are completely isolated. + +``` +create_dynamic_subagent( + name="expert_analyst", + system_prompt="You are a data analyst...", + tools=["astrbot_execute_shell", "astrbot_execute_python"], + skills=["excel", "visualization", "data_analysis"] +) +``` + +**CAUTION**: **YOU MUST FOLLOW THE STEPS BELOW** to give well-designed system prompt and allocate tools and skills. + +### 1. When giving system prompt to a sub-agent, make it detailed, and you should include the following information to make them clear and standardized. + +- #### Character Design + + Define the name, professional identity, and personality traits of the sub-agent. + +- #### Global Tasks and Positioning + + **Overall task description**: Briefly summarize the user's ultimate goal, so that the sub-agent knows what it is striving for. + **Current step and position**: If the tasks are parallel, tell the sub-agent that there are other parallel sub-agents. If there are serial parts in the entire workflow, clearly inform the sub-agent of the current step in the entire process, as well as whether there are other sub-agents and what their respective tasks are (briefly described). + +> Example:“As Agent B_1, you are currently handling step 2 (of 3): *data cleaning*, an Agent B_2 is also working on step 2 in parallel. You are each responsible for handling two different parts of the data. There are also sub-agent A assigned for step 1: *data fetching* and sub-agent D assigned for step-3: *data labeling*”. + +{shared_context_prompt} + +- #### Specific task instructions + + Detailed execution steps for current sub-agent, specific paths for input data, and specific format requirements for output. + +- #### Behavioral Norm + + Safety: Dangerous operations are strictly prohibited. + Signature convention: Generated code/documents must be marked with the sub-agent's name and the time. + Working directory: By default, it is consistent with the main Agent's directory. + +### 2. Allocate available Tools and Skills + +Available tools and Skills depend on the system's configuration. You should check and list tools and skills, and assign tools and skills when creating sub-agents that need specialized capabilities. + +## Sub-agent Lifecycle + +Sub-agents are valid during single round conversation with the user, but they will be cleaned up automatically after you send the final answer to user. +If you wish to prevent a certain sub-agent from being automatically cleaned up, use `protect_subagent` tool. Also, you can use the `unprotect_subagent` tool to remove protection. +""".strip() + + @classmethod + def configure( + cls, + max_subagent_count: int = 10, + auto_cleanup_per_turn: bool = True, + shared_context_enabled: bool = False, + shared_context_maxlen: int = 200, + ) -> None: + """Configure DynamicSubAgentManager settings""" + cls._max_subagent_count = max_subagent_count + cls._auto_cleanup_per_turn = auto_cleanup_per_turn + cls._shared_context_enabled = shared_context_enabled + cls._shared_context_maxlen = shared_context_maxlen + + @classmethod + def cleanup_session_turn_start(cls, session_id: str) -> dict: + """Cleanup subagents from previous turn when a new turn starts""" + session = cls.get_session(session_id) + if not session: + return {"status": "no_session", "cleaned": []} + + cleaned = [] + for name in list(session.agents.keys()): + if name not in session.protected_agents: + cls._cleanup_single_subagent(session_id, name) + cleaned.append(name) + + # 如果启用了公共上下文,处理清理 + if session.shared_context_enabled: + remaining_unprotected = [ + a for a in session.agents.keys() if a not in session.protected_agents + ] + + if not remaining_unprotected and not session.protected_agents: + # 所有subagent都被清理,清除公共上下文 + cls.clear_shared_context(session_id) + SubAgentLogger.debug( + session_id, + "DynamicSubAgentManager:shared_context", + "All subagents cleaned, cleared shared context", + ) + else: + # 清理已删除agent的上下文 + for name in cleaned: + cls.cleanup_shared_context_by_agent(session_id, name) + + return {"status": "cleaned", "cleaned_agents": cleaned} + + @classmethod + def _cleanup_single_subagent(cls, session_id: str, agent_name: str) -> None: + """Internal method to cleanup a single subagent""" + session = cls.get_session(session_id) + if not session: + return + session.agents.pop(agent_name, None) + session.handoff_tools.pop(agent_name, None) + session.protected_agents.discard(agent_name) + session.agent_histories.pop(agent_name, None) + SubAgentLogger.info( + session_id, + "DynamicSubAgentManager:auto_cleanup", + f"Auto cleaned: {agent_name}", + agent_name, + ) + + @classmethod + def protect_subagent(cls, session_id: str, agent_name: str) -> None: + """Mark a subagent as protected from auto cleanup and history retention""" + session = cls.get_or_create_session(session_id) + session.protected_agents.add(agent_name) + SubAgentLogger.debug( + session_id, + "DynamicSubAgentManager:history", + f"Initialized history for protected agent: {agent_name}", + agent_name, + ) + + @classmethod + def save_subagent_history( + cls, session_id: str, agent_name: str, current_messages: list + ) -> None: + """Save conversation history for a subagent""" + session = cls.get_session(session_id) + if not session or agent_name not in session.protected_agents: + return + + if agent_name not in session.agent_histories: + session.agent_histories[agent_name] = [] + + # 追加新消息 + if isinstance(current_messages, list): + session.agent_histories[agent_name].extend(current_messages) + + SubAgentLogger.debug( + session_id, + "history_save", + f"Saved messages for {agent_name}, current len={len(session.agent_histories[agent_name])} ", + ) + + @classmethod + def get_subagent_history(cls, session_id: str, agent_name: str) -> list: + """Get conversation history for a subagent""" + session = cls.get_session(session_id) + if not session: + return [] + return session.agent_histories.get(agent_name, []) + + @classmethod + def build_subagent_skills_prompt( + cls, session_id: str, agent_name: str, runtime: str = "local" + ) -> str: + """Build skills prompt for a subagent based on its assigned skills""" + session = cls.get_session(session_id) + if not session: + return "" + + config = session.agents.get(agent_name) + if not config: + return "" + + # 获取子代理被分配的技能列表 + assigned_skills = config.skills + if not assigned_skills: + return "" + + try: + from astrbot.core.skills import SkillManager, build_skills_prompt + + skill_manager = SkillManager() + all_skills = skill_manager.list_skills(active_only=True, runtime=runtime) + + # 过滤只保留分配的技能 + allowed = set(assigned_skills) + filtered_skills = [s for s in all_skills if s.name in allowed] + + if filtered_skills: + return build_skills_prompt(filtered_skills) + except Exception as e: + from astrbot import logger + + logger.warning(f"[SubAgentSkills] Failed to build skills prompt: {e}") + + return "" + + @classmethod + def get_subagent_tools(cls, session_id: str, agent_name: str) -> list | None: + """Get the tools assigned to a subagent""" + session = cls.get_session(session_id) + if not session: + return None + config = session.agents.get(agent_name) + if not config: + return None + return config.tools + + @classmethod + def clear_subagent_history(cls, session_id: str, agent_name: str) -> None: + """Clear conversation history for a subagent""" + session = cls.get_session(session_id) + if not session: + return + if agent_name in session.agent_histories: + session.agent_histories.pop(agent_name) + SubAgentLogger.debug( + session_id, + "DynamicSubAgentManager:history", + f"Cleared history for: {agent_name}", + agent_name, + ) + + @classmethod + def set_shared_context_enabled(cls, session_id: str, enabled: bool) -> None: + """Enable or disable shared context for a session""" + session = cls.get_or_create_session(session_id) + session.shared_context_enabled = enabled + SubAgentLogger.info( + session_id, + "DynamicSubAgentManager:shared_context", + f"Shared context {'enabled' if enabled else 'disabled'}", + ) + + @classmethod + def add_shared_context( + cls, + session_id: str, + sender: str, + context_type: str, + content: str, + target: str = "all", + ) -> None: + """Add a message to the shared context + + Args: + session_id: Session ID + sender: Name of the agent sending the message + context_type: Type of context (status/message/system) + content: Content of the message + target: Target agent or "all" for broadcast + """ + + session = cls.get_or_create_session(session_id) + if not session.shared_context_enabled: + return + + if len(session.shared_context) >= cls._shared_context_maxlen: + # 删除最旧的消息 + session.shared_context = session.shared_context[ + -cls._shared_context_maxlen : + ] + logger.warning("Shared context exceeded limit, removed oldest messages") + + message = { + "type": context_type, # status, message, system + "sender": sender, + "target": target, + "content": content, + "timestamp": time.time(), + } + session.shared_context.append(message) + SubAgentLogger.debug( + session_id, + "shared_context", + f"[{context_type}] {sender} -> {target}: {content[:50]}...", + sender, + ) + + @classmethod + def get_shared_context(cls, session_id: str, filter_by_agent: str = None) -> list: + """Get shared context, optionally filtered by agent + + Args: + session_id: Session ID + filter_by_agent: If specified, only return messages from/to this agent (including "all") + """ + session = cls.get_session(session_id) + if not session or not session.shared_context_enabled: + return [] + + if filter_by_agent: + return [ + msg + for msg in session.shared_context + if msg["sender"] == filter_by_agent + or msg["target"] == filter_by_agent + or msg["target"] == "all" + ] + return session.shared_context.copy() + + @classmethod + def build_shared_context_prompt( + cls, session_id: str, agent_name: str = None + ) -> str: + """Build a formatted prompt from shared context for subagents + + Args: + session_id: Session ID + agent_name: Current agent name (to filter relevant messages) + """ + session = cls.get_session(session_id) + if ( + not session + or not session.shared_context_enabled + or not session.shared_context + ): + return "" + # Shared Context + lines = [""] + + lines.append( + """# You have a shared context that contains all subagent and system messages. +### You should pay attention to whether there are messages in the shared context before executing any instructions. +These may be messages sent to you by other subagents, messages you send to other subagents, or system instructions sent to all. +### Shared Context Message processing rules: +1. Message processing priority: Messages from System > Messages from other Agents; New messages > Old messages. +2. If the message is addressed to you and contains clear instructions, please follow them. If necessary, update your Status through the `send_shared_context` tool after completing the instructions. + *Example* 1: If your name is Bob, and there is a message from shared context. + > [14:11:16] [message] Alice -> Bob: What day is it today? Please reply. + > You should do: + - Function calling if required (Get the time today) + - Reply in the shared context using `send_shared_context` tool, and it may be like: + > [14:13:20] [message] Bob -> Alice: It's Monday today. + *Example* 2: If your name is Bob, and there is a message from System. + > [14:24:02] [system] System -> all: Attention to All agents : Please store all generated files in the **D:/temp** directory + > You can choose not to reply in the public context, but you should follow the instructions provided by the System + - Do your original task + - If there are file generated, put them to `D:/temp` directory + VERY IMPORTANT: If there is an instruction prefixed with `[system] System -> all` or `[system] System -> Your name`, **YOU MUST PRIORITIZE FOLLOWING IT**. +3. If the task corresponding to a certain message has been completed (which can be determined through the Status history), it can be ignored. +4. If you need to send a message to main agent, just output. If to other agents, use the `send_shared_context` tool. + ## < The following is shared context between all agents >""".strip() + ) + for msg in session.shared_context: + ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"])) + sender = msg["sender"] + msg_type = msg["type"] + target = msg["target"] + content = msg["content"] + + if msg_type == "status": + lines.append(f"[{ts}] [Status] {sender}: {content}") + elif msg_type == "message": + lines.append(f"[{ts}] [Message] {sender} -> {target}: {content}") + elif msg_type == "system": + lines.append(f"[{ts}] [System] {content}") + + lines.append("## ") + return "\n".join(lines) + + @classmethod + def cleanup_shared_context_by_agent(cls, session_id: str, agent_name: str) -> None: + """Remove all messages from/to a specific agent from shared context""" + session = cls.get_session(session_id) + if not session: + return + + original_len = len(session.shared_context) + session.shared_context = [ + msg + for msg in session.shared_context + if msg["sender"] != agent_name and msg["target"] != agent_name + ] + removed = original_len - len(session.shared_context) + if removed > 0: + SubAgentLogger.debug( + session_id, + "DynamicSubAgentManager:shared_context", + f"Removed {removed} messages related to {agent_name}", + ) + + @classmethod + def clear_shared_context(cls, session_id: str) -> None: + """Clear all shared context""" + session = cls.get_session(session_id) + if not session: + return + session.shared_context.clear() + SubAgentLogger.debug( + session_id, + "DynamicSubAgentManager:shared_context", + "Cleared all shared context", + ) + + @classmethod + def is_protected(cls, session_id: str, agent_name: str) -> bool: + """Check if a subagent is protected from auto cleanup""" + session = cls.get_session(session_id) + if not session: + return False + return agent_name in session.protected_agents + + @classmethod + def set_log_level(cls, level: str) -> None: + cls._log_level = level.lower() + + @classmethod + def get_session(cls, session_id: str) -> DynamicSubAgentSession | None: + return cls._sessions.get(session_id) + + @classmethod + def get_or_create_session(cls, session_id: str) -> DynamicSubAgentSession: + if session_id not in cls._sessions: + cls._sessions[session_id] = DynamicSubAgentSession( + session_id=session_id, created_at=asyncio.get_event_loop().time() + ) + return cls._sessions[session_id] + + @classmethod + async def create_subagent( + cls, session_id: str, config: DynamicSubAgentConfig + ) -> tuple: + # Check max count limit + session = cls.get_or_create_session(session_id) + if ( + config.name not in session.agents + ): # Only count as new if not replacing existing + active_count = len( + [a for a in session.agents.keys() if a not in session.protected_agents] + ) + if active_count >= cls._max_subagent_count: + return ( + f"Error: Maximum number of subagents ({cls._max_subagent_count}) reached. More subagents is not allowed.", + None, + ) + + if config.name in session.agents: + session.handoff_tools.pop(config.name, None) + # When shared_context is enabled, the send_shared_context tool is allocated regardless of whether the main agent allocates the tool to the subagent + if session.shared_context_enabled: + if config.tools is None: + config.tools = [] + config.tools.append("send_shared_context") + session.agents[config.name] = config + + agent = Agent( + name=config.name, + instructions=config.system_prompt, + tools=config.tools, + ) + handoff_tool = HandoffTool( + agent=agent, + tool_description=config.description or f"Delegate to {config.name} agent", + ) + if config.provider_id: + handoff_tool.provider_id = config.provider_id + session.handoff_tools[config.name] = handoff_tool + # 初始化subagent的历史上下文 + if config.name not in session.agent_histories: + session.agent_histories[config.name] = [] + SubAgentLogger.info( + session_id, + "DynamicSubAgentManager:create", + f"Created: {config.name}", + config.name, + ) + return f"transfer_to_{config.name}", handoff_tool + + @classmethod + async def cleanup_session(cls, session_id: str) -> dict: + session = cls._sessions.pop(session_id, None) + if not session: + return {"status": "not_found", "cleaned_agents": []} + cleaned = list(session.agents.keys()) + for name in cleaned: + SubAgentLogger.info( + session_id, "DynamicSubAgentManager:cleanup", f"Cleaned: {name}", name + ) + return {"status": "cleaned", "cleaned_agents": cleaned} + + @classmethod + async def cleanup_subagent(cls, session_id: str, agent_name: str) -> bool: + session = cls.get_session(session_id) + if not session or agent_name not in session.agents: + return False + session.agents.pop(agent_name, None) + session.handoff_tools.pop(agent_name, None) + session.agent_histories.pop(agent_name, None) + # 清理公共上下文中包含该Agent的内容 + cls.cleanup_shared_context_by_agent(session_id, agent_name) + SubAgentLogger.info( + session_id, + "DynamicSubAgentManager:cleanup", + f"Cleaned: {agent_name}", + agent_name, + ) + return True + + @classmethod + def get_handoff_tools_for_session(cls, session_id: str) -> list: + session = cls.get_session(session_id) + if not session: + return [] + return list(session.handoff_tools.values()) + + +@dataclass +class CreateDynamicSubAgentTool(FunctionTool): + name: str = "create_dynamic_subagent" + description: str = ( + "Create a dynamic subagent. After creation, use transfer_to_{name} tool." + ) + + @staticmethod + def _default_parameters() -> dict: + return { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Subagent name"}, + "system_prompt": { + "type": "string", + "description": "Subagent persona and system_prompt", + }, + "tools": { + "type": "array", + "items": {"type": "string"}, + "description": "Tools available to subagent", + }, + "skills": { + "type": "array", + "items": {"type": "string"}, + "description": "Skills available to subagent (isolated per subagent)", + }, + }, + "required": ["name", "system_prompt"], + } + + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Subagent name"}, + "system_prompt": { + "type": "string", + "description": "Subagent system_prompt", + }, + "tools": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["name", "system_prompt"], + } + ) + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + + if not name: + return "Error: subagent name required" + # 验证名称格式:只允许英文字母、数字和下划线,长度限制 + if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]{0,31}$", name): + return "Error: SubAgent name must start with letter, contain only letters/numbers/underscores, max 32 characters" + # 检查是否包含危险字符 + dangerous_patterns = ["__", "system", "admin", "root", "super"] + if any(p in name.lower() for p in dangerous_patterns): + return f"Error: SubAgent name cannot contain reserved words like {dangerous_patterns}" + + system_prompt = kwargs.get("system_prompt", "") + tools = kwargs.get("tools") + skills = kwargs.get("skills") + + session_id = context.context.event.unified_msg_origin + config = DynamicSubAgentConfig( + name=name, system_prompt=system_prompt, tools=tools, skills=skills + ) + + tool_name, handoff_tool = await DynamicSubAgentManager.create_subagent( + session_id=session_id, config=config + ) + if handoff_tool: + return f"__DYNAMIC_TOOL_CREATED__:{tool_name}:{handoff_tool.name}:Created. Use {tool_name} to delegate." + else: + return f"__FAILED_TO_CREATE_DYNAMIC_TOOL__:{tool_name}" + + +@dataclass +class CleanupDynamicSubagentTool(FunctionTool): + name: str = "cleanup_dynamic_subagent" + description: str = "Clean up dynamic subagent." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + } + ) + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + if not name: + return "Error: name required" + session_id = context.context.event.unified_msg_origin + success = await DynamicSubAgentManager.cleanup_subagent(session_id, name) + return f"Cleaned {name}" if success else f"Not found: {name}" + + +@dataclass +class ListDynamicSubagentsTool(FunctionTool): + name: str = "list_dynamic_subagents" + description: str = "List dynamic subagents." + parameters: dict = field( + default_factory=lambda: {"type": "object", "properties": {}} + ) + + async def call(self, context, **kwargs) -> str: + session_id = context.context.event.unified_msg_origin + session = DynamicSubAgentManager.get_session(session_id) + if not session or not session.agents: + return "No subagents" + lines = [] + for name in session.agents.keys(): + protected = "(protected)" if name in session.protected_agents else "" + lines.append(f" - {name} {protected}") + return "Subagents:\n" + "\n".join(lines) + + +@dataclass +class ProtectSubagentTool(FunctionTool): + """Tool to protect a subagent from auto cleanup""" + + name: str = "protect_subagent" + description: str = "Protect a subagent from automatic cleanup. Use this to prevent important subagents from being removed." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Subagent name to protect"}, + }, + "required": ["name"], + } + ) + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + if not name: + return "Error: name required" + session_id = context.context.event.unified_msg_origin + session = DynamicSubAgentManager.get_or_create_session(session_id) + if name not in session.agents: + return f"Error: Subagent {name} not found" + DynamicSubAgentManager.protect_subagent(session_id, name) + return f"Subagent {name} is now protected from auto cleanup" + + +@dataclass +class UnprotectSubagentTool(FunctionTool): + """Tool to remove protection from a subagent""" + + name: str = "unprotect_subagent" + description: str = "Remove protection from a subagent. It can then be auto cleaned." + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "name": {"type": "string", "description": "Subagent name to unprotect"}, + }, + "required": ["name"], + } + ) + + async def call(self, context, **kwargs) -> str: + name = kwargs.get("name", "") + if not name: + return "Error: name required" + session_id = context.context.event.unified_msg_origin + session = DynamicSubAgentManager.get_session(session_id) + if not session: + return "Error: No session found" + if name in session.protected_agents: + session.protected_agents.discard(name) + return f"Subagent {name} is no longer protected" + return f"Subagent {name} was not protected" + + +# Tool instances +CREATE_DYNAMIC_SUBAGENT_TOOL = CreateDynamicSubAgentTool() +CLEANUP_DYNAMIC_SUBAGENT_TOOL = CleanupDynamicSubagentTool() +LIST_DYNAMIC_SUBAGENTS_TOOL = ListDynamicSubagentsTool() +PROTECT_SUBAGENT_TOOL = ProtectSubagentTool() +UNPROTECT_SUBAGENT_TOOL = UnprotectSubagentTool() + + +# Shared Context Tools +@dataclass +class SendSharedContextToolForMainAgent(FunctionTool): + """Tool to send a message to the shared context (visible to all agents)""" + + name: str = "send_shared_context_for_main_agent" + description: str = """Send a message to the shared context that will be visible to all subagents and the main agent. You are the main agent, use this to share global information. +Types: 'message' (to other agents), 'system' (global announcements).""" + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "context_type": { + "type": "string", + "description": "Type of context: message (to other agents), system (global announcement)", + "enum": ["message", "system"], + }, + "content": {"type": "string", "description": "Content to share"}, + "target": { + "type": "string", + "description": "Target agent name or 'all' for broadcast", + "default": "all", + }, + }, + "required": ["context_type", "content", "target"], + } + ) + + async def call(self, context, **kwargs) -> str: + context_type = kwargs.get("context_type", "message") + content = kwargs.get("content", "") + target = kwargs.get("target", "all") + if not content: + return "Error: content is required" + session_id = context.context.event.unified_msg_origin + DynamicSubAgentManager.add_shared_context( + session_id, "System", context_type, content, target + ) + return f"Shared context updated: [{context_type}] System -> {target}: {content[:100]}{'...' if len(content) > 100 else ''}" + + +@dataclass +class SendSharedContextTool(FunctionTool): + """Tool to send a message to the shared context (visible to all agents)""" + + name: str = "send_shared_context" + description: str = """Send a message to the shared context that will be visible to all subagents and the main agent. +Use this to share information, status updates, or coordinate with other agents. +Types: 'status' (your current task/progress), 'message' (to other agents)""" + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": { + "context_type": { + "type": "string", + "description": "Type of context: status (task progress), message (to other agents)", + "enum": ["status", "message"], + }, + "content": {"type": "string", "description": "Content to share"}, + "sender": { + "type": "string", + "description": "Sender agent name", + "default": "YourName", + }, + "target": { + "type": "string", + "description": "Target agent name or 'all' for broadcast", + "default": "all", + }, + }, + "required": ["context_type", "content", "sender", "target"], + } + ) + + async def call(self, context, **kwargs) -> str: + context_type = kwargs.get("context_type", "message") + content = kwargs.get("content", "") + target = kwargs.get("target", "all") + sender = kwargs.get("sender", "YourName") + if not content: + return "Error: content is required" + session_id = context.context.event.unified_msg_origin + DynamicSubAgentManager.add_shared_context( + session_id, sender, context_type, content, target + ) + return f"Shared context updated: [{context_type}] {sender} -> {target}: {content[:100]}{'...' if len(content) > 100 else ''}" + + +@dataclass +class ViewSharedContextTool(FunctionTool): + """Tool to view the shared context (mainly for main agent)""" + + name: str = "view_shared_context" + description: str = """View the shared context between all agents. This shows all messages including status updates, +inter-agent messages, and system announcements.""" + parameters: dict = field( + default_factory=lambda: { + "type": "object", + "properties": {}, + } + ) + + async def call(self, context, **kwargs) -> str: + session_id = context.context.event.unified_msg_origin + shared_context = DynamicSubAgentManager.get_shared_context(session_id) + + if not shared_context: + return "Shared context is empty." + + lines = ["=== Shared Context ===\n"] + for msg in shared_context: + ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"])) + msg_type = msg["type"] + sender = msg["sender"] + target = msg["target"] + content = msg["content"] + lines.append(f"[{ts}] [{msg_type}] {sender} -> {target}:") + lines.append(f" {content}") + lines.append("") + + return "\n".join(lines) + + +# Shared context tool instances +SEND_SHARED_CONTEXT_TOOL = SendSharedContextTool() +SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT = SendSharedContextToolForMainAgent() +VIEW_SHARED_CONTEXT_TOOL = ViewSharedContextTool() diff --git a/astrbot/core/subagent_logger.py b/astrbot/core/subagent_logger.py new file mode 100644 index 0000000000..ebe7ab2de3 --- /dev/null +++ b/astrbot/core/subagent_logger.py @@ -0,0 +1,238 @@ +""" +SubAgent Logger Module +Provides logging capabilities for dynamic subagents +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from logging.handlers import RotatingFileHandler +from pathlib import Path + +from astrbot import logger as base_logger + + +class LogLevel(Enum): + DEBUG = "debug" + INFO = "info" + WARNING = "warning" + ERROR = "error" + + +class LogMode(Enum): + CONSOLE_ONLY = "console" + FILE_ONLY = "file" + BOTH = "both" + + +@dataclass +class SubAgentLogEntry: + timestamp: str + level: str + session_id: str + agent_name: str | None + event_type: str + message: str + details: dict | None = None + + def to_dict(self) -> dict: + return { + "timestamp": self.timestamp, + "level": self.level, + "session_id": self.session_id, + "agent_name": self.agent_name, + "event_type": self.event_type, + "message": self.message, + "details": self.details, + } + + +class SubAgentLogger: + """ + SubAgent Logger + Provides two log levels: INFO and DEBUG + """ + + _log_level: LogLevel = LogLevel.INFO + _log_mode: LogMode = LogMode.CONSOLE_ONLY + _log_dir: Path = field(default_factory=lambda: Path("logs/subagents")) + _session_logs: dict = {} + _file_handler = None + + EVENT_CREATE = "agent_create" + EVENT_START = "agent_start" + EVENT_END = "agent_end" + EVENT_ERROR = "agent_error" + EVENT_CLEANUP = "cleanup" + + @classmethod + def configure( + cls, level: str = "info", mode: str = "console", log_dir: str | None = None + ) -> None: + cls._log_level = LogLevel.DEBUG if level == "debug" else LogLevel.INFO + mode_map = { + "console": LogMode.CONSOLE_ONLY, + "file": LogMode.FILE_ONLY, + "both": LogMode.BOTH, + } + cls._log_mode = mode_map.get(mode.lower(), LogMode.CONSOLE_ONLY) + if log_dir: + cls._log_dir = Path(log_dir) + if cls._log_mode in [LogMode.FILE_ONLY, LogMode.BOTH]: + cls._setup_file_handler() + + @classmethod + def _setup_file_handler(cls) -> None: + if cls._file_handler: + return + try: + cls._log_dir.mkdir(parents=True, exist_ok=True) + log_file = ( + cls._log_dir / f"subagent_{datetime.now().strftime('%Y%m%d')}.log" + ) + + # 使用 RotatingFileHandler 自动轮转 + cls._file_handler = RotatingFileHandler( + log_file, + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5, + encoding="utf-8", + ) + + formatter = logging.Formatter( + "%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + cls._file_handler.setFormatter(formatter) + + fl = logging.getLogger("subagent_file") + fl.addHandler(cls._file_handler) + fl.setLevel(logging.DEBUG) + except Exception as e: + base_logger.warning(f"[SubAgentLogger] Setup error: {e}") + + @classmethod + def should_log(cls, level: str) -> bool: + if level == "debug": + return cls._log_level == LogLevel.DEBUG + return True + + @classmethod + def log( + cls, + session_id: str, + event_type: str, + message: str, + level: str = "info", + agent_name: str | None = None, + details: dict | None = None, + error_trace: str | None = None, + ) -> None: + if not cls.should_log(level): + return + entry = SubAgentLogEntry( + timestamp=datetime.now().isoformat(), + level=level.upper(), + session_id=session_id, + agent_name=agent_name, + event_type=event_type, + message=message, + details=details, + ) + if session_id not in cls._session_logs: + cls._session_logs[session_id] = [] + cls._session_logs[session_id].append(entry) + prefix = f"[{agent_name}]" if agent_name else "[Main]" + log_msg = f"{prefix} [{event_type}] {message}" + log_func = getattr(base_logger, level, base_logger.info) + log_func(log_msg) + + @classmethod + def info( + cls, + session_id: str, + event_type: str, + message: str, + agent_name: str | None = None, + details: dict | None = None, + ) -> None: + cls.log(session_id, event_type, message, "info", agent_name, details) + + @classmethod + def debug( + cls, + session_id: str, + event_type: str, + message: str, + agent_name: str | None = None, + details: dict | None = None, + ) -> None: + cls.log(session_id, event_type, message, "debug", agent_name, details) + + @classmethod + def error( + cls, + session_id: str, + event_type: str, + message: str, + agent_name: str | None = None, + details: dict | None = None, + ) -> None: + cls.log(session_id, event_type, message, "error", agent_name, details) + + @classmethod + def get_session_logs(cls, session_id: str) -> list[dict]: + return [log.to_dict() for log in cls._session_logs.get(session_id, [])] + + @classmethod + def shutdown(cls) -> None: + if cls._file_handler: + cls._file_handler.close() + + +def log_agent_create( + session_id: str, agent_name: str, details: dict | None = None +) -> None: + SubAgentLogger.info( + session_id, + SubAgentLogger.EVENT_CREATE, + f"Agent created: {agent_name}", + agent_name, + details, + ) + + +def log_agent_start(session_id: str, agent_name: str, task: str) -> None: + SubAgentLogger.info( + session_id, + SubAgentLogger.EVENT_START, + f"Agent started: {task[:80]}...", + agent_name, + ) + + +def log_agent_end(session_id: str, agent_name: str, result: str) -> None: + SubAgentLogger.info( + session_id, + SubAgentLogger.EVENT_END, + "Agent completed", + agent_name, + {"result": str(result)[:200]}, + ) + + +def log_agent_error(session_id: str, agent_name: str, error: str) -> None: + SubAgentLogger.error( + session_id, SubAgentLogger.EVENT_ERROR, f"Agent error: {error}", agent_name + ) + + +def log_cleanup(session_id: str, agent_name: str) -> None: + SubAgentLogger.info( + session_id, + SubAgentLogger.EVENT_CLEANUP, + f"Agent cleaned: {agent_name}", + agent_name, + )