diff --git a/astrbot/core/agent/runners/tool_loop_agent_runner.py b/astrbot/core/agent/runners/tool_loop_agent_runner.py
index 3fb487cbe6..bd921de165 100644
--- a/astrbot/core/agent/runners/tool_loop_agent_runner.py
+++ b/astrbot/core/agent/runners/tool_loop_agent_runner.py
@@ -731,15 +731,47 @@ def _append_tool_call_result(tool_call_id: str, content: str) -> None:
if not req.func_tool:
return
- if (
- self.tool_schema_mode == "skills_like"
- and self._skill_like_raw_tool_set
- ):
- # in 'skills_like' mode, raw.func_tool is light schema, does not have handler
- # so we need to get the tool from the raw tool set
- func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name)
- else:
- func_tool = req.func_tool.get_tool(func_tool_name)
+ # First check if it's a dynamically created subagent tool
+ func_tool = None
+ run_context_context = getattr(self.run_context, "context", None)
+ if run_context_context is not None:
+ event = getattr(run_context_context, "event", None)
+ if event is not None:
+ session_id = getattr(
+ self.run_context.context.event, "unified_msg_origin", None
+ )
+ if session_id:
+ try:
+ from astrbot.core.dynamic_subagent_manager import (
+ DynamicSubAgentManager,
+ )
+
+ dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(
+ session_id
+ )
+ for h in dynamic_handoffs:
+ if (
+ h.name == func_tool_name
+ or f"transfer_to_{h.name}" == func_tool_name
+ ):
+ func_tool = h
+ break
+ except Exception:
+ pass
+
+ # If not found in dynamic tools, check regular tool sets
+ if func_tool is None:
+ if (
+ self.tool_schema_mode == "skills_like"
+ and self._skill_like_raw_tool_set
+ ):
+ # in 'skills_like' mode, raw.func_tool is light schema, does not have handler
+ # so we need to get the tool from the raw tool set
+ func_tool = self._skill_like_raw_tool_set.get_tool(
+ func_tool_name
+ )
+ else:
+ func_tool = req.func_tool.get_tool(func_tool_name)
logger.info(f"使用工具:{func_tool_name},参数:{func_tool_args}")
@@ -859,9 +891,53 @@ def _append_tool_call_result(tool_call_id: str, content: str) -> None:
"The tool has returned a data type that is not supported."
)
if result_parts:
+ result_content = "\n\n".join(result_parts)
+ # Check for dynamic tool creation marker
+ if result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
+ parts = result_content.split(":", 3)
+ if len(parts) >= 4:
+ new_tool_name = parts[1]
+ new_tool_obj_name = parts[2]
+ logger.info(
+ f"[DynamicSubAgent] Tool created: {new_tool_name}"
+ )
+ # Try to add the new tool to func_tool set
+ try:
+ from astrbot.core.dynamic_subagent_manager import (
+ DynamicSubAgentManager,
+ )
+
+ session_id = getattr(
+ self.run_context.context.event,
+ "unified_msg_origin",
+ None,
+ )
+ if session_id:
+ handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(
+ session_id
+ )
+ for handoff in handoffs:
+ if (
+ handoff.name == new_tool_obj_name
+ or handoff.name
+ == new_tool_name.replace(
+ "transfer_to_", ""
+ )
+ ):
+ if self.req.func_tool:
+ self.req.func_tool.add_tool(
+ handoff
+ )
+ logger.info(
+ f"[DynamicSubAgent] Added {handoff.name} to func_tool set"
+ )
+ except Exception as e:
+ logger.warning(
+ f"[DynamicSubAgent] Failed to add dynamic tool: {e}"
+ )
_append_tool_call_result(
func_tool_id,
- "\n\n".join(result_parts),
+ result_content,
)
elif resp is None:
diff --git a/astrbot/core/astr_agent_tool_exec.py b/astrbot/core/astr_agent_tool_exec.py
index 1fb4b03368..590ee20033 100644
--- a/astrbot/core/astr_agent_tool_exec.py
+++ b/astrbot/core/astr_agent_tool_exec.py
@@ -1,4 +1,5 @@
import asyncio
+import datetime
import inspect
import json
import traceback
@@ -233,6 +234,21 @@ def _build_handoff_toolset(
toolset.add_tool(runtime_tool)
elif isinstance(tool_name_or_obj, FunctionTool):
toolset.add_tool(tool_name_or_obj)
+
+ # Always add send_shared_context tool for shared context feature
+ try:
+ from astrbot.core.dynamic_subagent_manager import (
+ SEND_SHARED_CONTEXT_TOOL,
+ DynamicSubAgentManager,
+ )
+
+ session_id = event.unified_msg_origin
+ session = DynamicSubAgentManager.get_session(session_id)
+ if session and session.shared_context_enabled:
+ toolset.add_tool(SEND_SHARED_CONTEXT_TOOL)
+ except Exception as e:
+ logger.debug(f"[EnhancedSubAgent] Failed to add shared context tool: {e}")
+
return None if toolset.empty() else toolset
@classmethod
@@ -291,21 +307,116 @@ async def _execute_handoff(
except Exception:
continue
+ # 获取子代理的历史上下文
+ agent_name = getattr(tool.agent, "name", None)
+ subagent_history = []
+ if agent_name:
+ try:
+ from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
+
+ stored_history = DynamicSubAgentManager.get_subagent_history(
+ umo, agent_name
+ )
+ if stored_history:
+ # 将历史消息转换为 Message 对象
+ for hist_msg in stored_history:
+ try:
+ if isinstance(hist_msg, dict):
+ subagent_history.append(
+ Message.model_validate(hist_msg)
+ )
+ elif isinstance(hist_msg, Message):
+ subagent_history.append(hist_msg)
+ except Exception:
+ continue
+ if subagent_history:
+ logger.info(
+ f"[SubAgentHistory] Loaded {len(subagent_history)} history messages for {agent_name}"
+ )
+
+ except Exception as e:
+ logger.warning(
+ f"[SubAgentHistory] Failed to load history for {agent_name}: {e}"
+ )
+
prov_settings: dict = ctx.get_config(umo=umo).get("provider_settings", {})
agent_max_step = int(prov_settings.get("max_agent_step", 30))
stream = prov_settings.get("streaming_response", False)
+ # 如果有历史上下文,合并到 contexts 中
+ if subagent_history:
+ if contexts is None:
+ contexts = subagent_history
+ else:
+ contexts = subagent_history + contexts
+
+ # 构建子代理的 system_prompt,添加 skills 提示词和公共上下文
+ subagent_system_prompt = tool.agent.instructions or ""
+ if agent_name:
+ try:
+ from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
+
+ # 注入 skills
+ runtime = prov_settings.get("computer_use_runtime", "local")
+ skills_prompt = DynamicSubAgentManager.build_subagent_skills_prompt(
+ umo, agent_name, runtime
+ )
+ if skills_prompt:
+ subagent_system_prompt += f"\n\n# Available Skills\n{skills_prompt}"
+ logger.info(f"[SubAgentSkills] Injected skills for {agent_name}")
+
+ # 注入公共上下文
+ shared_context_prompt = (
+ DynamicSubAgentManager.build_shared_context_prompt(umo, agent_name)
+ )
+ if shared_context_prompt:
+ subagent_system_prompt += f"\n{shared_context_prompt}"
+ logger.info(
+ f"[SubAgentSharedContext] Injected shared context for {agent_name}"
+ )
+
+ # 注入时间信息
+ current_time = (
+ datetime.datetime.now().astimezone().strftime("%Y-%m-%d %H:%M (%Z)")
+ )
+ subagent_system_prompt += f"Current datetime: {current_time}"
+
+ except Exception:
+ pass
+
llm_resp = await ctx.tool_loop_agent(
event=event,
chat_provider_id=prov_id,
prompt=input_,
image_urls=image_urls,
- system_prompt=tool.agent.instructions,
+ system_prompt=subagent_system_prompt,
tools=toolset,
contexts=contexts,
max_steps=agent_max_step,
tool_call_timeout=run_context.tool_call_timeout,
stream=stream,
)
+
+ # 保存历史上下文
+ try:
+ from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
+
+ agent_name = getattr(tool.agent, "name", None)
+ if agent_name:
+ # 构建当前对话的历史消息
+ current_messages = []
+ # 添加本轮用户输入
+ current_messages.append({"role": "user", "content": input_})
+ # 添加助手回复
+ current_messages.append(
+ {"role": "assistant", "content": llm_resp.completion_text}
+ )
+ if current_messages:
+ DynamicSubAgentManager.save_subagent_history(
+ umo, agent_name, current_messages
+ )
+ except Exception:
+ pass # 不影响主流程
+
yield mcp.types.CallToolResult(
content=[mcp.types.TextContent(type="text", text=llm_resp.completion_text)]
)
diff --git a/astrbot/core/astr_main_agent.py b/astrbot/core/astr_main_agent.py
index 2b4a04907e..c9fe212767 100644
--- a/astrbot/core/astr_main_agent.py
+++ b/astrbot/core/astr_main_agent.py
@@ -143,6 +143,8 @@ class MainAgentBuildConfig:
timezone: str | None = None
max_quoted_fallback_images: int = 20
"""Maximum number of images injected from quoted-message fallback extraction."""
+ enhanced_subagent: dict = field(default_factory=dict)
+ """Log level for enhanced SubAgent: info or debug."""
@dataclass(slots=True)
@@ -929,6 +931,98 @@ def _apply_llm_safety_mode(config: MainAgentBuildConfig, req: ProviderRequest) -
)
+def _apply_enhanced_subagent_tools(
+ config: MainAgentBuildConfig, req: ProviderRequest, event: AstrMessageEvent
+) -> None:
+ """Apply enhanced SubAgent tools and system prompt
+
+ When enabled:
+ 1. Inject enhanced capability prompt into system prompt
+ 2. Register dynamic SubAgent management tools
+ 3. Register session's transfer_to_xxx tools
+ """
+ if not config.enhanced_subagent.get("enabled", False):
+ return
+
+ if req.func_tool is None:
+ req.func_tool = ToolSet()
+
+ try:
+ from astrbot.core.dynamic_subagent_manager import (
+ CLEANUP_DYNAMIC_SUBAGENT_TOOL,
+ CREATE_DYNAMIC_SUBAGENT_TOOL,
+ LIST_DYNAMIC_SUBAGENTS_TOOL,
+ PROTECT_SUBAGENT_TOOL,
+ SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT,
+ UNPROTECT_SUBAGENT_TOOL,
+ VIEW_SHARED_CONTEXT_TOOL,
+ DynamicSubAgentManager,
+ )
+ from astrbot.core.subagent_logger import SubAgentLogger
+
+ # Register dynamic SubAgent management tools
+ req.func_tool.add_tool(CREATE_DYNAMIC_SUBAGENT_TOOL)
+ req.func_tool.add_tool(CLEANUP_DYNAMIC_SUBAGENT_TOOL)
+ req.func_tool.add_tool(LIST_DYNAMIC_SUBAGENTS_TOOL)
+ req.func_tool.add_tool(PROTECT_SUBAGENT_TOOL)
+ req.func_tool.add_tool(UNPROTECT_SUBAGENT_TOOL)
+ req.func_tool.add_tool(VIEW_SHARED_CONTEXT_TOOL)
+ req.func_tool.add_tool(SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT)
+
+ # Configure logger
+ SubAgentLogger.configure(level=config.enhanced_subagent.get("log_level"))
+
+ # Configure DynamicSubAgentManager with settings
+ shared_context_enabled = config.enhanced_subagent.get(
+ "shared_context_enabled", False
+ )
+ DynamicSubAgentManager.configure(
+ max_subagent_count=config.enhanced_subagent.get("max_subagent_count"),
+ auto_cleanup_per_turn=config.enhanced_subagent.get("auto_cleanup_per_turn"),
+ shared_context_enabled=shared_context_enabled,
+ shared_context_maxlen=config.enhanced_subagent.get(
+ "shared_context_maxlen", 200
+ ),
+ )
+
+ # Enable shared context if configured
+ if shared_context_enabled:
+ DynamicSubAgentManager.set_shared_context_enabled(
+ event.unified_msg_origin, True
+ )
+
+ # Inject enhanced system prompt
+ dynamic_subagent_prompt = DynamicSubAgentManager.get_dynamic_subagent_prompt()
+ req.system_prompt = f"{req.system_prompt or ''}\n{dynamic_subagent_prompt}\n"
+ # Register existing handoff tools from config
+ plugin_context = getattr(event, "_plugin_context", None)
+ if plugin_context and plugin_context.subagent_orchestrator:
+ so = plugin_context.subagent_orchestrator
+ if hasattr(so, "handoffs"):
+ for tool in so.handoffs:
+ req.func_tool.add_tool(tool)
+ # Register dynamically created handoff tools
+ session_id = event.unified_msg_origin
+ dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(
+ session_id
+ )
+ for handoff in dynamic_handoffs:
+ req.func_tool.add_tool(handoff)
+
+ # Check if we should cleanup subagents from previous turn
+ # This is done at the START of a new turn to clean up from previous turn
+ try:
+ DynamicSubAgentManager.cleanup_session_turn_start(session_id)
+ except Exception as e:
+ from astrbot import logger
+
+ logger.warning(f"[DynamicSubAgent] Cleanup failed: {e}")
+ except ImportError as e:
+ from astrbot import logger
+
+ logger.warning(f"[DynamicSubAgent] Cannot import module: {e}")
+
+
def _apply_sandbox_tools(
config: MainAgentBuildConfig, req: ProviderRequest, session_id: str
) -> None:
@@ -1254,6 +1348,9 @@ async def build_main_agent(
elif config.computer_use_runtime == "local":
_apply_local_env_tools(req)
+ # Apply enhanced SubAgent tools
+ _apply_enhanced_subagent_tools(config, req, event)
+
agent_runner = AgentRunner()
astr_agent_ctx = AstrAgentContext(
context=plugin_context,
diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py
index 89aad5193e..39fee8fdba 100644
--- a/astrbot/core/config/default.py
+++ b/astrbot/core/config/default.py
@@ -195,6 +195,15 @@
),
"agents": [],
},
+ # 增强版动态SubAgent配置(独立于subagent_orchestrator)
+ "enhanced_subagent": {
+ "enabled": False,
+ "log_level": "debug",
+ "max_subagent_count": 3,
+ "auto_cleanup_per_turn": True,
+ "shared_context_enabled": False,
+ "shared_context_maxlen": 200,
+ },
"provider_stt_settings": {
"enable": False,
"provider_id": "",
@@ -2446,17 +2455,17 @@ class ChatProviderTemplate(TypedDict):
"mimo-tts-style-prompt": {
"description": "风格提示词",
"type": "string",
- "hint": "会以 标签形式添加到待合成文本开头,用于控制语速、情绪、角色或风格,例如 开心、变快、孙悟空、悄悄话。可留空。",
+ "hint": "用于控制生成语音的说话风格、语气或情绪,例如温柔、活泼、沉稳等。可留空。",
},
"mimo-tts-dialect": {
"description": "方言",
"type": "string",
- "hint": "会与风格提示词一起写入开头的 标签中,例如 东北话、四川话、河南话、粤语。可留空。",
+ "hint": "指定生成语音时使用的方言或口音,例如四川话、粤语口音等。可留空。",
},
"mimo-tts-seed-text": {
"description": "种子文本",
"type": "string",
- "hint": "作为可选的 user 消息发送,用于辅助调节语气和风格,不会拼接到待合成文本中。",
+ "hint": "用于引导音色和说话方式的参考文本,会影响生成语音的表达风格。",
},
"fishaudio-tts-character": {
"description": "character",
diff --git a/astrbot/core/dynamic_subagent_manager.py b/astrbot/core/dynamic_subagent_manager.py
new file mode 100644
index 0000000000..ddba92523b
--- /dev/null
+++ b/astrbot/core/dynamic_subagent_manager.py
@@ -0,0 +1,916 @@
+"""
+Dynamic SubAgent Manager
+Manages dynamically created subagents for task decomposition and parallel processing
+"""
+
+from __future__ import annotations
+
+import asyncio
+import re
+import time
+from dataclasses import dataclass, field
+
+from astrbot import logger
+from astrbot.core.agent.agent import Agent
+from astrbot.core.agent.handoff import HandoffTool
+from astrbot.core.agent.tool import FunctionTool
+from astrbot.core.subagent_logger import SubAgentLogger
+
+
+@dataclass
+class DynamicSubAgentConfig:
+ name: str
+ system_prompt: str = ""
+ tools: list | None = None
+ skills: list | None = None
+ provider_id: str | None = None
+ description: str = ""
+ max_steps: int = 30
+ begin_dialogs: list | None = None
+
+
+@dataclass
+class SubAgentExecutionResult:
+ agent_name: str
+ success: bool
+ result: str
+ error: str | None = None
+ execution_time: float = 0.0
+
+
+@dataclass
+class DynamicSubAgentSession:
+ session_id: str
+ agents: dict = field(default_factory=dict)
+ handoff_tools: dict = field(default_factory=dict)
+ results: dict = field(default_factory=dict)
+ enable_interaction: bool = False
+ created_at: float = 0.0
+ protected_agents: set = field(
+ default_factory=set
+ ) # 若某个agent受到保护,则不会被自动清理
+ agent_histories: dict = field(default_factory=dict) # 存储每个子代理的历史上下文
+ shared_context: list = field(default_factory=list) # 公共上下文列表
+ shared_context_enabled: bool = False # 是否启用公共上下文
+
+
+class DynamicSubAgentManager:
+ _sessions: dict = {}
+ _log_level: str = "info"
+ _max_subagent_count: int = 3
+ _auto_cleanup_per_turn: bool = True
+ _shared_context_enabled: bool = False
+ _shared_context_maxlen: int = 200
+
+ @classmethod
+ def get_dynamic_subagent_prompt(cls):
+ if cls._shared_context_enabled:
+ shared_context_prompt = """- #### Collaborative Communication Mechanism
+ **Communication Tool description**: Inform the sub-agent that `send_shared_context` tool can be used for public channel communication, visible to all online sub-agents and the main agent.
+ **Communication protocol** : Clarify when to use this tool.
+ Progress reporting: Status updates must be sent when a task starts, encounters a blockage, or is completed.
+ Resource handover: After completing the task, send the generated file path and key conclusions to the public channel for use by downstream agents."""
+ else:
+ shared_context_prompt = ""
+
+ return f"""# Dynamic Sub-Agent Capability
+
+You are the Main Agent, and have the ability to dynamically create and manage sub-agents with isolated instructions, tools and skills.
+
+## When to create Sub-agents:
+
+- The task can be explicitly decomposed
+- Requires multiple professional domain
+- Processing very long contexts that exceeding the limitations of a single agent
+- Parallel processing
+
+## Primary Workflow
+
+1. **Global planning**:
+ After receiving a user request, first formulate an overall execution plan and break it down into multiple subtask steps.
+
+ Identify the dependencies between subtasks (who comes first and who comes second, who depends on whose output, and which sub-agents can run in parallel).
+
+2. **Sub-Agent Designing**:
+ Use the `create_dynamic_subagent` tool to create multiple sub-agents, and `transfer_to_{{name}}` tools will be created, where `{{name}}` is the name of a sub-agent.
+
+3. **Sub-Agent Delegating**
+ Use the `transfer_to_{{name}}` tool to delegate sub-agent
+
+## Creating Sub-agents with Name, System Prompt, Tools and Skills
+
+When creating a sub-agent, you should name it with **letters, numbers, and underscores**, no Chinese characters, punctuation marks, emojis or other characters not allowed in computer program.
+
+Meanwhile, you need to assign specific **System Prompt**, **Tools** and **Skills** to it. Each sub-agent's system prompt, tools and skills are completely isolated.
+
+```
+create_dynamic_subagent(
+ name="expert_analyst",
+ system_prompt="You are a data analyst...",
+ tools=["astrbot_execute_shell", "astrbot_execute_python"],
+ skills=["excel", "visualization", "data_analysis"]
+)
+```
+
+**CAUTION**: **YOU MUST FOLLOW THE STEPS BELOW** to give well-designed system prompt and allocate tools and skills.
+
+### 1. When giving system prompt to a sub-agent, make it detailed, and you should include the following information to make them clear and standardized.
+
+- #### Character Design
+
+ Define the name, professional identity, and personality traits of the sub-agent.
+
+- #### Global Tasks and Positioning
+
+ **Overall task description**: Briefly summarize the user's ultimate goal, so that the sub-agent knows what it is striving for.
+ **Current step and position**: If the tasks are parallel, tell the sub-agent that there are other parallel sub-agents. If there are serial parts in the entire workflow, clearly inform the sub-agent of the current step in the entire process, as well as whether there are other sub-agents and what their respective tasks are (briefly described).
+
+> Example:“As Agent B_1, you are currently handling step 2 (of 3): *data cleaning*, an Agent B_2 is also working on step 2 in parallel. You are each responsible for handling two different parts of the data. There are also sub-agent A assigned for step 1: *data fetching* and sub-agent D assigned for step-3: *data labeling*”.
+
+{shared_context_prompt}
+
+- #### Specific task instructions
+
+ Detailed execution steps for current sub-agent, specific paths for input data, and specific format requirements for output.
+
+- #### Behavioral Norm
+
+ Safety: Dangerous operations are strictly prohibited.
+ Signature convention: Generated code/documents must be marked with the sub-agent's name and the time.
+ Working directory: By default, it is consistent with the main Agent's directory.
+
+### 2. Allocate available Tools and Skills
+
+Available tools and Skills depend on the system's configuration. You should check and list tools and skills, and assign tools and skills when creating sub-agents that need specialized capabilities.
+
+## Sub-agent Lifecycle
+
+Sub-agents are valid during single round conversation with the user, but they will be cleaned up automatically after you send the final answer to user.
+If you wish to prevent a certain sub-agent from being automatically cleaned up, use `protect_subagent` tool. Also, you can use the `unprotect_subagent` tool to remove protection.
+""".strip()
+
+ @classmethod
+ def configure(
+ cls,
+ max_subagent_count: int = 10,
+ auto_cleanup_per_turn: bool = True,
+ shared_context_enabled: bool = False,
+ shared_context_maxlen: int = 200,
+ ) -> None:
+ """Configure DynamicSubAgentManager settings"""
+ cls._max_subagent_count = max_subagent_count
+ cls._auto_cleanup_per_turn = auto_cleanup_per_turn
+ cls._shared_context_enabled = shared_context_enabled
+ cls._shared_context_maxlen = shared_context_maxlen
+
+ @classmethod
+ def cleanup_session_turn_start(cls, session_id: str) -> dict:
+ """Cleanup subagents from previous turn when a new turn starts"""
+ session = cls.get_session(session_id)
+ if not session:
+ return {"status": "no_session", "cleaned": []}
+
+ cleaned = []
+ for name in list(session.agents.keys()):
+ if name not in session.protected_agents:
+ cls._cleanup_single_subagent(session_id, name)
+ cleaned.append(name)
+
+ # 如果启用了公共上下文,处理清理
+ if session.shared_context_enabled:
+ remaining_unprotected = [
+ a for a in session.agents.keys() if a not in session.protected_agents
+ ]
+
+ if not remaining_unprotected and not session.protected_agents:
+ # 所有subagent都被清理,清除公共上下文
+ cls.clear_shared_context(session_id)
+ SubAgentLogger.debug(
+ session_id,
+ "DynamicSubAgentManager:shared_context",
+ "All subagents cleaned, cleared shared context",
+ )
+ else:
+ # 清理已删除agent的上下文
+ for name in cleaned:
+ cls.cleanup_shared_context_by_agent(session_id, name)
+
+ return {"status": "cleaned", "cleaned_agents": cleaned}
+
+ @classmethod
+ def _cleanup_single_subagent(cls, session_id: str, agent_name: str) -> None:
+ """Internal method to cleanup a single subagent"""
+ session = cls.get_session(session_id)
+ if not session:
+ return
+ session.agents.pop(agent_name, None)
+ session.handoff_tools.pop(agent_name, None)
+ session.protected_agents.discard(agent_name)
+ session.agent_histories.pop(agent_name, None)
+ SubAgentLogger.info(
+ session_id,
+ "DynamicSubAgentManager:auto_cleanup",
+ f"Auto cleaned: {agent_name}",
+ agent_name,
+ )
+
+ @classmethod
+ def protect_subagent(cls, session_id: str, agent_name: str) -> None:
+ """Mark a subagent as protected from auto cleanup and history retention"""
+ session = cls.get_or_create_session(session_id)
+ session.protected_agents.add(agent_name)
+ SubAgentLogger.debug(
+ session_id,
+ "DynamicSubAgentManager:history",
+ f"Initialized history for protected agent: {agent_name}",
+ agent_name,
+ )
+
+ @classmethod
+ def save_subagent_history(
+ cls, session_id: str, agent_name: str, current_messages: list
+ ) -> None:
+ """Save conversation history for a subagent"""
+ session = cls.get_session(session_id)
+ if not session or agent_name not in session.protected_agents:
+ return
+
+ if agent_name not in session.agent_histories:
+ session.agent_histories[agent_name] = []
+
+ # 追加新消息
+ if isinstance(current_messages, list):
+ session.agent_histories[agent_name].extend(current_messages)
+
+ SubAgentLogger.debug(
+ session_id,
+ "history_save",
+ f"Saved messages for {agent_name}, current len={len(session.agent_histories[agent_name])} ",
+ )
+
+ @classmethod
+ def get_subagent_history(cls, session_id: str, agent_name: str) -> list:
+ """Get conversation history for a subagent"""
+ session = cls.get_session(session_id)
+ if not session:
+ return []
+ return session.agent_histories.get(agent_name, [])
+
+ @classmethod
+ def build_subagent_skills_prompt(
+ cls, session_id: str, agent_name: str, runtime: str = "local"
+ ) -> str:
+ """Build skills prompt for a subagent based on its assigned skills"""
+ session = cls.get_session(session_id)
+ if not session:
+ return ""
+
+ config = session.agents.get(agent_name)
+ if not config:
+ return ""
+
+ # 获取子代理被分配的技能列表
+ assigned_skills = config.skills
+ if not assigned_skills:
+ return ""
+
+ try:
+ from astrbot.core.skills import SkillManager, build_skills_prompt
+
+ skill_manager = SkillManager()
+ all_skills = skill_manager.list_skills(active_only=True, runtime=runtime)
+
+ # 过滤只保留分配的技能
+ allowed = set(assigned_skills)
+ filtered_skills = [s for s in all_skills if s.name in allowed]
+
+ if filtered_skills:
+ return build_skills_prompt(filtered_skills)
+ except Exception as e:
+ from astrbot import logger
+
+ logger.warning(f"[SubAgentSkills] Failed to build skills prompt: {e}")
+
+ return ""
+
+ @classmethod
+ def get_subagent_tools(cls, session_id: str, agent_name: str) -> list | None:
+ """Get the tools assigned to a subagent"""
+ session = cls.get_session(session_id)
+ if not session:
+ return None
+ config = session.agents.get(agent_name)
+ if not config:
+ return None
+ return config.tools
+
+ @classmethod
+ def clear_subagent_history(cls, session_id: str, agent_name: str) -> None:
+ """Clear conversation history for a subagent"""
+ session = cls.get_session(session_id)
+ if not session:
+ return
+ if agent_name in session.agent_histories:
+ session.agent_histories.pop(agent_name)
+ SubAgentLogger.debug(
+ session_id,
+ "DynamicSubAgentManager:history",
+ f"Cleared history for: {agent_name}",
+ agent_name,
+ )
+
+ @classmethod
+ def set_shared_context_enabled(cls, session_id: str, enabled: bool) -> None:
+ """Enable or disable shared context for a session"""
+ session = cls.get_or_create_session(session_id)
+ session.shared_context_enabled = enabled
+ SubAgentLogger.info(
+ session_id,
+ "DynamicSubAgentManager:shared_context",
+ f"Shared context {'enabled' if enabled else 'disabled'}",
+ )
+
+ @classmethod
+ def add_shared_context(
+ cls,
+ session_id: str,
+ sender: str,
+ context_type: str,
+ content: str,
+ target: str = "all",
+ ) -> None:
+ """Add a message to the shared context
+
+ Args:
+ session_id: Session ID
+ sender: Name of the agent sending the message
+ context_type: Type of context (status/message/system)
+ content: Content of the message
+ target: Target agent or "all" for broadcast
+ """
+
+ session = cls.get_or_create_session(session_id)
+ if not session.shared_context_enabled:
+ return
+
+ if len(session.shared_context) >= cls._shared_context_maxlen:
+ # 删除最旧的消息
+ session.shared_context = session.shared_context[
+ -cls._shared_context_maxlen :
+ ]
+ logger.warning("Shared context exceeded limit, removed oldest messages")
+
+ message = {
+ "type": context_type, # status, message, system
+ "sender": sender,
+ "target": target,
+ "content": content,
+ "timestamp": time.time(),
+ }
+ session.shared_context.append(message)
+ SubAgentLogger.debug(
+ session_id,
+ "shared_context",
+ f"[{context_type}] {sender} -> {target}: {content[:50]}...",
+ sender,
+ )
+
+ @classmethod
+ def get_shared_context(cls, session_id: str, filter_by_agent: str = None) -> list:
+ """Get shared context, optionally filtered by agent
+
+ Args:
+ session_id: Session ID
+ filter_by_agent: If specified, only return messages from/to this agent (including "all")
+ """
+ session = cls.get_session(session_id)
+ if not session or not session.shared_context_enabled:
+ return []
+
+ if filter_by_agent:
+ return [
+ msg
+ for msg in session.shared_context
+ if msg["sender"] == filter_by_agent
+ or msg["target"] == filter_by_agent
+ or msg["target"] == "all"
+ ]
+ return session.shared_context.copy()
+
+ @classmethod
+ def build_shared_context_prompt(
+ cls, session_id: str, agent_name: str = None
+ ) -> str:
+ """Build a formatted prompt from shared context for subagents
+
+ Args:
+ session_id: Session ID
+ agent_name: Current agent name (to filter relevant messages)
+ """
+ session = cls.get_session(session_id)
+ if (
+ not session
+ or not session.shared_context_enabled
+ or not session.shared_context
+ ):
+ return ""
+ # Shared Context
+ lines = [""]
+
+ lines.append(
+ """# You have a shared context that contains all subagent and system messages.
+### You should pay attention to whether there are messages in the shared context before executing any instructions.
+These may be messages sent to you by other subagents, messages you send to other subagents, or system instructions sent to all.
+### Shared Context Message processing rules:
+1. Message processing priority: Messages from System > Messages from other Agents; New messages > Old messages.
+2. If the message is addressed to you and contains clear instructions, please follow them. If necessary, update your Status through the `send_shared_context` tool after completing the instructions.
+ *Example* 1: If your name is Bob, and there is a message from shared context.
+ > [14:11:16] [message] Alice -> Bob: What day is it today? Please reply.
+ > You should do:
+ - Function calling if required (Get the time today)
+ - Reply in the shared context using `send_shared_context` tool, and it may be like:
+ > [14:13:20] [message] Bob -> Alice: It's Monday today.
+ *Example* 2: If your name is Bob, and there is a message from System.
+ > [14:24:02] [system] System -> all: Attention to All agents : Please store all generated files in the **D:/temp** directory
+ > You can choose not to reply in the public context, but you should follow the instructions provided by the System
+ - Do your original task
+ - If there are file generated, put them to `D:/temp` directory
+ VERY IMPORTANT: If there is an instruction prefixed with `[system] System -> all` or `[system] System -> Your name`, **YOU MUST PRIORITIZE FOLLOWING IT**.
+3. If the task corresponding to a certain message has been completed (which can be determined through the Status history), it can be ignored.
+4. If you need to send a message to main agent, just output. If to other agents, use the `send_shared_context` tool.
+ ## < The following is shared context between all agents >""".strip()
+ )
+ for msg in session.shared_context:
+ ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"]))
+ sender = msg["sender"]
+ msg_type = msg["type"]
+ target = msg["target"]
+ content = msg["content"]
+
+ if msg_type == "status":
+ lines.append(f"[{ts}] [Status] {sender}: {content}")
+ elif msg_type == "message":
+ lines.append(f"[{ts}] [Message] {sender} -> {target}: {content}")
+ elif msg_type == "system":
+ lines.append(f"[{ts}] [System] {content}")
+
+ lines.append("## End of shared context >")
+ return "\n".join(lines)
+
+ @classmethod
+ def cleanup_shared_context_by_agent(cls, session_id: str, agent_name: str) -> None:
+ """Remove all messages from/to a specific agent from shared context"""
+ session = cls.get_session(session_id)
+ if not session:
+ return
+
+ original_len = len(session.shared_context)
+ session.shared_context = [
+ msg
+ for msg in session.shared_context
+ if msg["sender"] != agent_name and msg["target"] != agent_name
+ ]
+ removed = original_len - len(session.shared_context)
+ if removed > 0:
+ SubAgentLogger.debug(
+ session_id,
+ "DynamicSubAgentManager:shared_context",
+ f"Removed {removed} messages related to {agent_name}",
+ )
+
+ @classmethod
+ def clear_shared_context(cls, session_id: str) -> None:
+ """Clear all shared context"""
+ session = cls.get_session(session_id)
+ if not session:
+ return
+ session.shared_context.clear()
+ SubAgentLogger.debug(
+ session_id,
+ "DynamicSubAgentManager:shared_context",
+ "Cleared all shared context",
+ )
+
+ @classmethod
+ def is_protected(cls, session_id: str, agent_name: str) -> bool:
+ """Check if a subagent is protected from auto cleanup"""
+ session = cls.get_session(session_id)
+ if not session:
+ return False
+ return agent_name in session.protected_agents
+
+ @classmethod
+ def set_log_level(cls, level: str) -> None:
+ cls._log_level = level.lower()
+
+ @classmethod
+ def get_session(cls, session_id: str) -> DynamicSubAgentSession | None:
+ return cls._sessions.get(session_id)
+
+ @classmethod
+ def get_or_create_session(cls, session_id: str) -> DynamicSubAgentSession:
+ if session_id not in cls._sessions:
+ cls._sessions[session_id] = DynamicSubAgentSession(
+ session_id=session_id, created_at=asyncio.get_event_loop().time()
+ )
+ return cls._sessions[session_id]
+
+ @classmethod
+ async def create_subagent(
+ cls, session_id: str, config: DynamicSubAgentConfig
+ ) -> tuple:
+ # Check max count limit
+ session = cls.get_or_create_session(session_id)
+ if (
+ config.name not in session.agents
+ ): # Only count as new if not replacing existing
+ active_count = len(
+ [a for a in session.agents.keys() if a not in session.protected_agents]
+ )
+ if active_count >= cls._max_subagent_count:
+ return (
+ f"Error: Maximum number of subagents ({cls._max_subagent_count}) reached. More subagents is not allowed.",
+ None,
+ )
+
+ if config.name in session.agents:
+ session.handoff_tools.pop(config.name, None)
+ # When shared_context is enabled, the send_shared_context tool is allocated regardless of whether the main agent allocates the tool to the subagent
+ if session.shared_context_enabled:
+ if config.tools is None:
+ config.tools = []
+ config.tools.append("send_shared_context")
+ session.agents[config.name] = config
+
+ agent = Agent(
+ name=config.name,
+ instructions=config.system_prompt,
+ tools=config.tools,
+ )
+ handoff_tool = HandoffTool(
+ agent=agent,
+ tool_description=config.description or f"Delegate to {config.name} agent",
+ )
+ if config.provider_id:
+ handoff_tool.provider_id = config.provider_id
+ session.handoff_tools[config.name] = handoff_tool
+ # 初始化subagent的历史上下文
+ if config.name not in session.agent_histories:
+ session.agent_histories[config.name] = []
+ SubAgentLogger.info(
+ session_id,
+ "DynamicSubAgentManager:create",
+ f"Created: {config.name}",
+ config.name,
+ )
+ return f"transfer_to_{config.name}", handoff_tool
+
+ @classmethod
+ async def cleanup_session(cls, session_id: str) -> dict:
+ session = cls._sessions.pop(session_id, None)
+ if not session:
+ return {"status": "not_found", "cleaned_agents": []}
+ cleaned = list(session.agents.keys())
+ for name in cleaned:
+ SubAgentLogger.info(
+ session_id, "DynamicSubAgentManager:cleanup", f"Cleaned: {name}", name
+ )
+ return {"status": "cleaned", "cleaned_agents": cleaned}
+
+ @classmethod
+ async def cleanup_subagent(cls, session_id: str, agent_name: str) -> bool:
+ session = cls.get_session(session_id)
+ if not session or agent_name not in session.agents:
+ return False
+ session.agents.pop(agent_name, None)
+ session.handoff_tools.pop(agent_name, None)
+ session.agent_histories.pop(agent_name, None)
+ # 清理公共上下文中包含该Agent的内容
+ cls.cleanup_shared_context_by_agent(session_id, agent_name)
+ SubAgentLogger.info(
+ session_id,
+ "DynamicSubAgentManager:cleanup",
+ f"Cleaned: {agent_name}",
+ agent_name,
+ )
+ return True
+
+ @classmethod
+ def get_handoff_tools_for_session(cls, session_id: str) -> list:
+ session = cls.get_session(session_id)
+ if not session:
+ return []
+ return list(session.handoff_tools.values())
+
+
+@dataclass
+class CreateDynamicSubAgentTool(FunctionTool):
+ name: str = "create_dynamic_subagent"
+ description: str = (
+ "Create a dynamic subagent. After creation, use transfer_to_{name} tool."
+ )
+
+ @staticmethod
+ def _default_parameters() -> dict:
+ return {
+ "type": "object",
+ "properties": {
+ "name": {"type": "string", "description": "Subagent name"},
+ "system_prompt": {
+ "type": "string",
+ "description": "Subagent persona and system_prompt",
+ },
+ "tools": {
+ "type": "array",
+ "items": {"type": "string"},
+ "description": "Tools available to subagent",
+ },
+ "skills": {
+ "type": "array",
+ "items": {"type": "string"},
+ "description": "Skills available to subagent (isolated per subagent)",
+ },
+ },
+ "required": ["name", "system_prompt"],
+ }
+
+ parameters: dict = field(
+ default_factory=lambda: {
+ "type": "object",
+ "properties": {
+ "name": {"type": "string", "description": "Subagent name"},
+ "system_prompt": {
+ "type": "string",
+ "description": "Subagent system_prompt",
+ },
+ "tools": {"type": "array", "items": {"type": "string"}},
+ },
+ "required": ["name", "system_prompt"],
+ }
+ )
+
+ async def call(self, context, **kwargs) -> str:
+ name = kwargs.get("name", "")
+
+ if not name:
+ return "Error: subagent name required"
+ # 验证名称格式:只允许英文字母、数字和下划线,长度限制
+ if not re.match(r"^[a-zA-Z][a-zA-Z0-9_]{0,31}$", name):
+ return "Error: SubAgent name must start with letter, contain only letters/numbers/underscores, max 32 characters"
+ # 检查是否包含危险字符
+ dangerous_patterns = ["__", "system", "admin", "root", "super"]
+ if any(p in name.lower() for p in dangerous_patterns):
+ return f"Error: SubAgent name cannot contain reserved words like {dangerous_patterns}"
+
+ system_prompt = kwargs.get("system_prompt", "")
+ tools = kwargs.get("tools")
+ skills = kwargs.get("skills")
+
+ session_id = context.context.event.unified_msg_origin
+ config = DynamicSubAgentConfig(
+ name=name, system_prompt=system_prompt, tools=tools, skills=skills
+ )
+
+ tool_name, handoff_tool = await DynamicSubAgentManager.create_subagent(
+ session_id=session_id, config=config
+ )
+ if handoff_tool:
+ return f"__DYNAMIC_TOOL_CREATED__:{tool_name}:{handoff_tool.name}:Created. Use {tool_name} to delegate."
+ else:
+ return f"__FAILED_TO_CREATE_DYNAMIC_TOOL__:{tool_name}"
+
+
+@dataclass
+class CleanupDynamicSubagentTool(FunctionTool):
+ name: str = "cleanup_dynamic_subagent"
+ description: str = "Clean up dynamic subagent."
+ parameters: dict = field(
+ default_factory=lambda: {
+ "type": "object",
+ "properties": {"name": {"type": "string"}},
+ "required": ["name"],
+ }
+ )
+
+ async def call(self, context, **kwargs) -> str:
+ name = kwargs.get("name", "")
+ if not name:
+ return "Error: name required"
+ session_id = context.context.event.unified_msg_origin
+ success = await DynamicSubAgentManager.cleanup_subagent(session_id, name)
+ return f"Cleaned {name}" if success else f"Not found: {name}"
+
+
+@dataclass
+class ListDynamicSubagentsTool(FunctionTool):
+ name: str = "list_dynamic_subagents"
+ description: str = "List dynamic subagents."
+ parameters: dict = field(
+ default_factory=lambda: {"type": "object", "properties": {}}
+ )
+
+ async def call(self, context, **kwargs) -> str:
+ session_id = context.context.event.unified_msg_origin
+ session = DynamicSubAgentManager.get_session(session_id)
+ if not session or not session.agents:
+ return "No subagents"
+ lines = []
+ for name in session.agents.keys():
+ protected = "(protected)" if name in session.protected_agents else ""
+ lines.append(f" - {name} {protected}")
+ return "Subagents:\n" + "\n".join(lines)
+
+
+@dataclass
+class ProtectSubagentTool(FunctionTool):
+ """Tool to protect a subagent from auto cleanup"""
+
+ name: str = "protect_subagent"
+ description: str = "Protect a subagent from automatic cleanup. Use this to prevent important subagents from being removed."
+ parameters: dict = field(
+ default_factory=lambda: {
+ "type": "object",
+ "properties": {
+ "name": {"type": "string", "description": "Subagent name to protect"},
+ },
+ "required": ["name"],
+ }
+ )
+
+ async def call(self, context, **kwargs) -> str:
+ name = kwargs.get("name", "")
+ if not name:
+ return "Error: name required"
+ session_id = context.context.event.unified_msg_origin
+ session = DynamicSubAgentManager.get_or_create_session(session_id)
+ if name not in session.agents:
+ return f"Error: Subagent {name} not found"
+ DynamicSubAgentManager.protect_subagent(session_id, name)
+ return f"Subagent {name} is now protected from auto cleanup"
+
+
+@dataclass
+class UnprotectSubagentTool(FunctionTool):
+ """Tool to remove protection from a subagent"""
+
+ name: str = "unprotect_subagent"
+ description: str = "Remove protection from a subagent. It can then be auto cleaned."
+ parameters: dict = field(
+ default_factory=lambda: {
+ "type": "object",
+ "properties": {
+ "name": {"type": "string", "description": "Subagent name to unprotect"},
+ },
+ "required": ["name"],
+ }
+ )
+
+ async def call(self, context, **kwargs) -> str:
+ name = kwargs.get("name", "")
+ if not name:
+ return "Error: name required"
+ session_id = context.context.event.unified_msg_origin
+ session = DynamicSubAgentManager.get_session(session_id)
+ if not session:
+ return "Error: No session found"
+ if name in session.protected_agents:
+ session.protected_agents.discard(name)
+ return f"Subagent {name} is no longer protected"
+ return f"Subagent {name} was not protected"
+
+
+# Tool instances
+CREATE_DYNAMIC_SUBAGENT_TOOL = CreateDynamicSubAgentTool()
+CLEANUP_DYNAMIC_SUBAGENT_TOOL = CleanupDynamicSubagentTool()
+LIST_DYNAMIC_SUBAGENTS_TOOL = ListDynamicSubagentsTool()
+PROTECT_SUBAGENT_TOOL = ProtectSubagentTool()
+UNPROTECT_SUBAGENT_TOOL = UnprotectSubagentTool()
+
+
+# Shared Context Tools
+@dataclass
+class SendSharedContextToolForMainAgent(FunctionTool):
+ """Tool to send a message to the shared context (visible to all agents)"""
+
+ name: str = "send_shared_context_for_main_agent"
+ description: str = """Send a message to the shared context that will be visible to all subagents and the main agent. You are the main agent, use this to share global information.
+Types: 'message' (to other agents), 'system' (global announcements)."""
+ parameters: dict = field(
+ default_factory=lambda: {
+ "type": "object",
+ "properties": {
+ "context_type": {
+ "type": "string",
+ "description": "Type of context: message (to other agents), system (global announcement)",
+ "enum": ["message", "system"],
+ },
+ "content": {"type": "string", "description": "Content to share"},
+ "target": {
+ "type": "string",
+ "description": "Target agent name or 'all' for broadcast",
+ "default": "all",
+ },
+ },
+ "required": ["context_type", "content", "target"],
+ }
+ )
+
+ async def call(self, context, **kwargs) -> str:
+ context_type = kwargs.get("context_type", "message")
+ content = kwargs.get("content", "")
+ target = kwargs.get("target", "all")
+ if not content:
+ return "Error: content is required"
+ session_id = context.context.event.unified_msg_origin
+ DynamicSubAgentManager.add_shared_context(
+ session_id, "System", context_type, content, target
+ )
+ return f"Shared context updated: [{context_type}] System -> {target}: {content[:100]}{'...' if len(content) > 100 else ''}"
+
+
+@dataclass
+class SendSharedContextTool(FunctionTool):
+ """Tool to send a message to the shared context (visible to all agents)"""
+
+ name: str = "send_shared_context"
+ description: str = """Send a message to the shared context that will be visible to all subagents and the main agent.
+Use this to share information, status updates, or coordinate with other agents.
+Types: 'status' (your current task/progress), 'message' (to other agents)"""
+ parameters: dict = field(
+ default_factory=lambda: {
+ "type": "object",
+ "properties": {
+ "context_type": {
+ "type": "string",
+ "description": "Type of context: status (task progress), message (to other agents)",
+ "enum": ["status", "message"],
+ },
+ "content": {"type": "string", "description": "Content to share"},
+ "sender": {
+ "type": "string",
+ "description": "Sender agent name",
+ "default": "YourName",
+ },
+ "target": {
+ "type": "string",
+ "description": "Target agent name or 'all' for broadcast",
+ "default": "all",
+ },
+ },
+ "required": ["context_type", "content", "sender", "target"],
+ }
+ )
+
+ async def call(self, context, **kwargs) -> str:
+ context_type = kwargs.get("context_type", "message")
+ content = kwargs.get("content", "")
+ target = kwargs.get("target", "all")
+ sender = kwargs.get("sender", "YourName")
+ if not content:
+ return "Error: content is required"
+ session_id = context.context.event.unified_msg_origin
+ DynamicSubAgentManager.add_shared_context(
+ session_id, sender, context_type, content, target
+ )
+ return f"Shared context updated: [{context_type}] {sender} -> {target}: {content[:100]}{'...' if len(content) > 100 else ''}"
+
+
+@dataclass
+class ViewSharedContextTool(FunctionTool):
+ """Tool to view the shared context (mainly for main agent)"""
+
+ name: str = "view_shared_context"
+ description: str = """View the shared context between all agents. This shows all messages including status updates,
+inter-agent messages, and system announcements."""
+ parameters: dict = field(
+ default_factory=lambda: {
+ "type": "object",
+ "properties": {},
+ }
+ )
+
+ async def call(self, context, **kwargs) -> str:
+ session_id = context.context.event.unified_msg_origin
+ shared_context = DynamicSubAgentManager.get_shared_context(session_id)
+
+ if not shared_context:
+ return "Shared context is empty."
+
+ lines = ["=== Shared Context ===\n"]
+ for msg in shared_context:
+ ts = time.strftime("%H:%M:%S", time.localtime(msg["timestamp"]))
+ msg_type = msg["type"]
+ sender = msg["sender"]
+ target = msg["target"]
+ content = msg["content"]
+ lines.append(f"[{ts}] [{msg_type}] {sender} -> {target}:")
+ lines.append(f" {content}")
+ lines.append("")
+
+ return "\n".join(lines)
+
+
+# Shared context tool instances
+SEND_SHARED_CONTEXT_TOOL = SendSharedContextTool()
+SEND_SHARED_CONTEXT_TOOL_FOR_MAIN_AGENT = SendSharedContextToolForMainAgent()
+VIEW_SHARED_CONTEXT_TOOL = ViewSharedContextTool()
diff --git a/astrbot/core/subagent_logger.py b/astrbot/core/subagent_logger.py
new file mode 100644
index 0000000000..ebe7ab2de3
--- /dev/null
+++ b/astrbot/core/subagent_logger.py
@@ -0,0 +1,238 @@
+"""
+SubAgent Logger Module
+Provides logging capabilities for dynamic subagents
+"""
+
+from __future__ import annotations
+
+import logging
+from dataclasses import dataclass, field
+from datetime import datetime
+from enum import Enum
+from logging.handlers import RotatingFileHandler
+from pathlib import Path
+
+from astrbot import logger as base_logger
+
+
+class LogLevel(Enum):
+ DEBUG = "debug"
+ INFO = "info"
+ WARNING = "warning"
+ ERROR = "error"
+
+
+class LogMode(Enum):
+ CONSOLE_ONLY = "console"
+ FILE_ONLY = "file"
+ BOTH = "both"
+
+
+@dataclass
+class SubAgentLogEntry:
+ timestamp: str
+ level: str
+ session_id: str
+ agent_name: str | None
+ event_type: str
+ message: str
+ details: dict | None = None
+
+ def to_dict(self) -> dict:
+ return {
+ "timestamp": self.timestamp,
+ "level": self.level,
+ "session_id": self.session_id,
+ "agent_name": self.agent_name,
+ "event_type": self.event_type,
+ "message": self.message,
+ "details": self.details,
+ }
+
+
+class SubAgentLogger:
+ """
+ SubAgent Logger
+ Provides two log levels: INFO and DEBUG
+ """
+
+ _log_level: LogLevel = LogLevel.INFO
+ _log_mode: LogMode = LogMode.CONSOLE_ONLY
+ _log_dir: Path = field(default_factory=lambda: Path("logs/subagents"))
+ _session_logs: dict = {}
+ _file_handler = None
+
+ EVENT_CREATE = "agent_create"
+ EVENT_START = "agent_start"
+ EVENT_END = "agent_end"
+ EVENT_ERROR = "agent_error"
+ EVENT_CLEANUP = "cleanup"
+
+ @classmethod
+ def configure(
+ cls, level: str = "info", mode: str = "console", log_dir: str | None = None
+ ) -> None:
+ cls._log_level = LogLevel.DEBUG if level == "debug" else LogLevel.INFO
+ mode_map = {
+ "console": LogMode.CONSOLE_ONLY,
+ "file": LogMode.FILE_ONLY,
+ "both": LogMode.BOTH,
+ }
+ cls._log_mode = mode_map.get(mode.lower(), LogMode.CONSOLE_ONLY)
+ if log_dir:
+ cls._log_dir = Path(log_dir)
+ if cls._log_mode in [LogMode.FILE_ONLY, LogMode.BOTH]:
+ cls._setup_file_handler()
+
+ @classmethod
+ def _setup_file_handler(cls) -> None:
+ if cls._file_handler:
+ return
+ try:
+ cls._log_dir.mkdir(parents=True, exist_ok=True)
+ log_file = (
+ cls._log_dir / f"subagent_{datetime.now().strftime('%Y%m%d')}.log"
+ )
+
+ # 使用 RotatingFileHandler 自动轮转
+ cls._file_handler = RotatingFileHandler(
+ log_file,
+ maxBytes=10 * 1024 * 1024, # 10MB
+ backupCount=5,
+ encoding="utf-8",
+ )
+
+ formatter = logging.Formatter(
+ "%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
+ )
+ cls._file_handler.setFormatter(formatter)
+
+ fl = logging.getLogger("subagent_file")
+ fl.addHandler(cls._file_handler)
+ fl.setLevel(logging.DEBUG)
+ except Exception as e:
+ base_logger.warning(f"[SubAgentLogger] Setup error: {e}")
+
+ @classmethod
+ def should_log(cls, level: str) -> bool:
+ if level == "debug":
+ return cls._log_level == LogLevel.DEBUG
+ return True
+
+ @classmethod
+ def log(
+ cls,
+ session_id: str,
+ event_type: str,
+ message: str,
+ level: str = "info",
+ agent_name: str | None = None,
+ details: dict | None = None,
+ error_trace: str | None = None,
+ ) -> None:
+ if not cls.should_log(level):
+ return
+ entry = SubAgentLogEntry(
+ timestamp=datetime.now().isoformat(),
+ level=level.upper(),
+ session_id=session_id,
+ agent_name=agent_name,
+ event_type=event_type,
+ message=message,
+ details=details,
+ )
+ if session_id not in cls._session_logs:
+ cls._session_logs[session_id] = []
+ cls._session_logs[session_id].append(entry)
+ prefix = f"[{agent_name}]" if agent_name else "[Main]"
+ log_msg = f"{prefix} [{event_type}] {message}"
+ log_func = getattr(base_logger, level, base_logger.info)
+ log_func(log_msg)
+
+ @classmethod
+ def info(
+ cls,
+ session_id: str,
+ event_type: str,
+ message: str,
+ agent_name: str | None = None,
+ details: dict | None = None,
+ ) -> None:
+ cls.log(session_id, event_type, message, "info", agent_name, details)
+
+ @classmethod
+ def debug(
+ cls,
+ session_id: str,
+ event_type: str,
+ message: str,
+ agent_name: str | None = None,
+ details: dict | None = None,
+ ) -> None:
+ cls.log(session_id, event_type, message, "debug", agent_name, details)
+
+ @classmethod
+ def error(
+ cls,
+ session_id: str,
+ event_type: str,
+ message: str,
+ agent_name: str | None = None,
+ details: dict | None = None,
+ ) -> None:
+ cls.log(session_id, event_type, message, "error", agent_name, details)
+
+ @classmethod
+ def get_session_logs(cls, session_id: str) -> list[dict]:
+ return [log.to_dict() for log in cls._session_logs.get(session_id, [])]
+
+ @classmethod
+ def shutdown(cls) -> None:
+ if cls._file_handler:
+ cls._file_handler.close()
+
+
+def log_agent_create(
+ session_id: str, agent_name: str, details: dict | None = None
+) -> None:
+ SubAgentLogger.info(
+ session_id,
+ SubAgentLogger.EVENT_CREATE,
+ f"Agent created: {agent_name}",
+ agent_name,
+ details,
+ )
+
+
+def log_agent_start(session_id: str, agent_name: str, task: str) -> None:
+ SubAgentLogger.info(
+ session_id,
+ SubAgentLogger.EVENT_START,
+ f"Agent started: {task[:80]}...",
+ agent_name,
+ )
+
+
+def log_agent_end(session_id: str, agent_name: str, result: str) -> None:
+ SubAgentLogger.info(
+ session_id,
+ SubAgentLogger.EVENT_END,
+ "Agent completed",
+ agent_name,
+ {"result": str(result)[:200]},
+ )
+
+
+def log_agent_error(session_id: str, agent_name: str, error: str) -> None:
+ SubAgentLogger.error(
+ session_id, SubAgentLogger.EVENT_ERROR, f"Agent error: {error}", agent_name
+ )
+
+
+def log_cleanup(session_id: str, agent_name: str) -> None:
+ SubAgentLogger.info(
+ session_id,
+ SubAgentLogger.EVENT_CLEANUP,
+ f"Agent cleaned: {agent_name}",
+ agent_name,
+ )