diff --git a/assets/demo/feishu_task_workbench_demo.svg b/assets/demo/feishu_task_workbench_demo.svg new file mode 100644 index 000000000..7a591d37d --- /dev/null +++ b/assets/demo/feishu_task_workbench_demo.svg @@ -0,0 +1,45 @@ + + + + + 小GA + GA m2pro 小GA 自媒体助理 + + 机器人 + + + + + 回复 池池:你学一下 Github - lsdefine/GenericAgent 最近做了哪些改动,对你有帮助的列出来 + 任务工作台 + + 状态 + + 当前:✅ 已完成 + + 耗时:91s + + 轮次:3 + + 最近进展 + 1. 任务 1:运行命令/脚本:curl -s "https://api.github.com/repos/lsdefine/GenericAgent... + 2. 任务 2:补 summary,继续分析 GA 上游最近改动的可借鉴点 + 3. 任务 3:筛选关键 commit,获取 goal_mode、lifecycle hook、飞书接口等改动的详细 diff + + 最终输出 + 结论已通过富文本消息发送。 + + + 过程记录 + 第 1 轮 · 运行命令/脚本:curl -s "https://api.github.com/repos/lsdefine/GenericAgent/commits?per_page=20" ... + 第 2 轮 · 补 summary,继续分析 GA 上游最近改动的可借鉴点 + 第 3 轮 · 筛选关键 commit,获取 goal_mode、lifecycle hook、飞书接口等改动的详细 diff + + + + + 回复 池池:你学一下 Github - lsdefine/GenericAgent 最近做了哪些改动,对你有帮助的列出来 + 同步汇报 + 刚拉了 `lsdefine/GenericAgent` 最近 30 个 commit,挑出对 GA 跑法和架构最有帮助的改动。 + + diff --git a/frontends/feishu_cards.py b/frontends/feishu_cards.py new file mode 100644 index 000000000..fee06d21c --- /dev/null +++ b/frontends/feishu_cards.py @@ -0,0 +1,207 @@ +"""飞书交互卡片的展示层工具。 + +这里故意只处理呈现,不改变 agent 的决策流程。目标是让长任务更好读。 +""" + +from __future__ import annotations + +import json +import os +import re +from typing import Iterable + +CARD_OUTPUT_ENABLED = os.environ.get("GA_FEISHU_CARD_OUTPUT", "1").lower() not in {"0", "false", "no", "off"} +MAX_MARKDOWN_BLOCK_CHARS = int(os.environ.get("GA_FEISHU_CARD_BLOCK_CHARS", "5200") or "5200") +MAX_WORKSPACE_DETAIL_CHARS = int(os.environ.get("GA_FEISHU_WORKSPACE_DETAIL_CHARS", "5200") or "5200") + + +def markdown(content: str) -> dict[str, str]: + return {"tag": "markdown", "content": str(content or "")} + + +def hr() -> dict[str, str]: + return {"tag": "hr"} + + +def collapsible_panel(title: str, content: str, *, expanded: bool = False) -> dict: + title = _clean_summary(title, limit=120) + content = str(content or "_(无输出)_").strip() or "_(无输出)_" + if len(content) > MAX_WORKSPACE_DETAIL_CHARS: + content = content[:MAX_WORKSPACE_DETAIL_CHARS].rstrip() + f"\n\n...(已截断, 共 {len(content)} 字符)" + return { + "tag": "collapsible_panel", + "expanded": expanded, + "header": {"title": {"tag": "plain_text", "content": title}}, + "elements": [markdown(content)], + } + + +def card_raw(elements: list[dict], *, title: str = "", template: str = "blue") -> str: + card: dict = { + "schema": "2.0", + "config": {"streaming_mode": False, "width_mode": "fill"}, + "body": {"elements": elements}, + } + title = re.sub(r"\s+", " ", str(title or "")).strip() + if title: + card["header"] = { + "template": template, + "title": {"tag": "plain_text", "content": title[:80]}, + } + return json.dumps(card, ensure_ascii=False) + + +def split_markdown_blocks(text: str, *, limit: int = MAX_MARKDOWN_BLOCK_CHARS) -> list[str]: + text = str(text or "").strip() + if not text: + return ["_(无文本输出)_"] + if len(text) <= limit: + return [text] + chunks: list[str] = [] + remaining = text + while len(remaining) > limit: + split_at = remaining.rfind("\n\n", 0, limit) + if split_at < limit // 2: + split_at = remaining.rfind("\n", 0, limit) + if split_at < limit // 2: + split_at = limit + chunks.append(remaining[:split_at].rstrip()) + remaining = remaining[split_at:].lstrip() + if remaining: + chunks.append(remaining) + return chunks + + +def _clean_summary(summary: str, *, limit: int = 90) -> str: + summary = re.sub(r"\s+", " ", str(summary or "")).strip() + if len(summary) > limit: + return summary[: limit - 3] + "..." + return summary or "继续处理" + + +def _extract_task_titles(text: str, *, limit: int = 6) -> list[str]: + titles: list[str] = [] + patterns = [ + re.compile(r"^\s*(?:#{1,4}\s*)?(?:[🚀✅🎉📌💡🛠️📄📁📊]\s*)?(任务\s*\d+\s*[::].+?)\s*$", re.I), + re.compile(r"^\s*(?:#{1,4}\s*)?(Task\s*\d+\s*[::].+?)\s*$", re.I), + re.compile(r"^\s*(\d+)[.、]\s+(?:\*\*)?([^::\n]{4,80}?)(?:\*\*)?\s*[::]\s*(.+?)\s*$", re.I), + ] + for line in str(text or "").splitlines(): + stripped = line.strip().strip("*") + for pattern in patterns: + match = pattern.match(stripped) + if match: + if len(match.groups()) >= 3 and match.group(1).isdigit(): + title = f"任务 {match.group(1)}:{match.group(2).strip()}:{match.group(3).strip()}" + else: + title = re.sub(r"\s+", " ", match.group(1)).strip() + if title and title not in titles: + titles.append(title) + break + if len(titles) >= limit: + break + return titles + + +def _task_title_from_summary(idx: int, summary: str) -> str: + summary = _clean_summary(summary, limit=70) + summary = re.sub(r"^任务\s*\d+\s*[::]\s*", "", summary) + summary = re.sub(r"^Turn\s*\d+\s*[::·-]\s*", "", summary, flags=re.I) + return f"任务 {idx}:{summary}" + + +def _workflow_markdown(step_summaries: Iterable[tuple[int, str]] | None, final_text: str) -> str: + lines: list[str] = [] + task_titles = _extract_task_titles(final_text) + if task_titles: + lines.append("### 完成清单") + lines.extend(f"{idx}. {title}" for idx, title in enumerate(task_titles, 1)) + elif step_summaries: + lines.append("### 最近进展") + for order, (_turn, summary) in enumerate(list(step_summaries)[-6:], 1): + lines.append(f"{order}. {_task_title_from_summary(order, summary)}") + return "\n".join(lines).strip() + + +def _has_output_heading(text: str) -> bool: + return bool(re.search(r"(?mi)^\s*#{1,6}\s*(?:Outputs?|结论|最终结论|已完成)\s*[::]?\s*$", str(text or ""))) + + +def build_status_card(status: str, *, elapsed: int = 0, turn_count: int = 0, step_summaries=None) -> str: + lines = [f"**{status or '工作中'}**", f"耗时: {elapsed}s"] + if turn_count: + lines.append(f"轮次: {turn_count}") + if step_summaries: + lines.append("") + lines.append("最近进展:") + for idx, summary in list(step_summaries): + lines.append(f"- 第 {idx} 轮:{_clean_summary(summary)}") + template = "green" if "完成" in str(status) else "blue" + return card_raw([markdown("\n".join(lines))], title=str(status or "工作中").replace("...", ""), template=template) + + +def build_progress_card(turn: int, summary: str, detail: str = "", *, compact: bool = False) -> str: + summary = _clean_summary(summary, limit=120) + body = [f"**{summary}**"] + detail = str(detail or "").strip() + if detail: + body.append(detail) + title = f"进展 · 第 {turn} 轮" + return card_raw([markdown("\n\n".join(body))], title=title, template="blue") + + +def build_task_workspace_card( + *, + status: str, + steps: Iterable[tuple[int, str, str]] | None = None, + final_text: str = "", + elapsed: int = 0, + turn_count: int = 0, + max_steps: int = 8, + title: str = "任务工作台", +) -> str: + """生成一张持续更新的任务工作台卡片,每轮进展默认折叠。""" + steps = list(steps or []) + visible_steps = steps[-max_steps:] if max_steps > 0 else steps + hidden = max(0, len(steps) - len(visible_steps)) + status_lines = ["### 状态", f"- 当前:**{status or '工作中'}**"] + if elapsed: + status_lines.append(f"- 耗时:{elapsed}s") + if turn_count: + status_lines.append(f"- 轮次:{turn_count}") + if hidden: + status_lines.append(f"- 早期进展:已折叠,保留最近 {len(visible_steps)} 轮") + + elements: list[dict] = [markdown("\n".join(status_lines))] + workflow_source = steps if _extract_task_titles(final_text) else visible_steps + workflow = _workflow_markdown([(idx, summary) for idx, summary, _detail in workflow_source], final_text) + if workflow: + elements.append(markdown(workflow)) + if final_text: + elements.append(markdown("### 最终输出")) + elements.extend(markdown(chunk) for chunk in split_markdown_blocks(final_text)) + if visible_steps: + elements.append(hr()) + elements.append(markdown("### 过程记录")) + for idx, summary, detail in visible_steps: + elements.append(collapsible_panel(f"第 {idx} 轮 · {summary}", detail)) + template = "green" if "完成" in str(status) else ("red" if "失败" in str(status) or "错误" in str(status) else "blue") + return card_raw(elements, title=title, template=template) + + +def build_final_card( + text: str, + *, + title: str = "已完成", + template: str = "green", + step_summaries: Iterable[tuple[int, str]] | None = None, +) -> str: + elements: list[dict] = [] + workflow = _workflow_markdown(step_summaries, text) + if workflow: + elements.append(markdown(workflow)) + elements.append(hr()) + if step_summaries and not _has_output_heading(text): + elements.append(markdown("### 最终输出")) + elements.extend(markdown(chunk) for chunk in split_markdown_blocks(text)) + return card_raw(elements, title=title, template=template) diff --git a/frontends/feishu_post.py b/frontends/feishu_post.py new file mode 100644 index 000000000..ab334cb64 --- /dev/null +++ b/frontends/feishu_post.py @@ -0,0 +1,162 @@ +"""飞书富文本消息工具。""" + +from __future__ import annotations + +import json +import os +import re +from typing import Any + + +AUTO_POST_MIN_CHARS = int(os.environ.get("GA_FEISHU_POST_MIN_CHARS", "420") or "420") +AUTO_POST_MIN_LINES = int(os.environ.get("GA_FEISHU_POST_MIN_LINES", "7") or "7") +MAX_POST_ROWS = int(os.environ.get("GA_FEISHU_POST_MAX_ROWS", "120") or "120") +MAX_ROW_CHARS = int(os.environ.get("GA_FEISHU_POST_MAX_ROW_CHARS", "1200") or "1200") + +_BOLD_RE = re.compile(r"\*\*([^*\n][^*\n]*?)\*\*") +_HEADING_RE = re.compile(r"^\s{0,3}#{1,6}\s+(.+?)\s*$") +_STRUCTURED_RE = re.compile( + r"(^|\n)\s*(#{1,6}\s+\S|[-*]\s+\S|\d+[.)]\s+\S|" + r"正式准入[::]|跳过[::]|证据记录[::]|反馈记录[::]|说明[::]|" + r"Dream\s*认知精炼报告|```|\|.+\|)", + re.IGNORECASE, +) +_OPERATIONAL_CARD_ENABLED = os.environ.get("GA_FEISHU_OPERATIONAL_CARD", "1").lower() not in { + "0", + "false", + "no", + "off", +} +_OPERATIONAL_RE = re.compile( + r"(PID|进程|Gateway|gateway|重启|已启动|已重启|连接已恢复|飞书连接|" + r"Feishu|Weixin|微信|平台|connected|运行正常|系统运行|状态稳定|" + r"验证|测试|pytest|passed|score\s*\d+|findings|push\s*成功|" + r"已同步|origin/main|HEAD|commit|提交|工作区干净|报错|失败)", + re.IGNORECASE, +) + + +def _strip_markdown(text: str) -> str: + text = re.sub(r"^\s{0,3}#{1,6}\s+", "", str(text or "")).strip() + text = text.replace("**", "").replace("`", "") + return re.sub(r"\s+", " ", text).strip() + + +def derive_post_title(text: str, fallback: str = "GA 回复") -> str: + """从第一行有意义的内容里取一个稳定标题。""" + for line in str(text or "").splitlines(): + title = _strip_markdown(line) + if not title: + continue + if len(title) > 80: + return fallback + return title + return fallback + + +def should_send_post(text: str, *, force: bool = False) -> bool: + """判断回复是否更适合用富文本消息发送。""" + text = str(text or "").strip() + if not text: + return False + if force: + return True + lines = [line for line in text.splitlines() if line.strip()] + if len(lines) >= AUTO_POST_MIN_LINES: + return True + if _STRUCTURED_RE.search(text): + return True + return len(text) >= AUTO_POST_MIN_CHARS and len(lines) >= 3 + + +def should_send_operational_card(text: str) -> bool: + """判断短状态/短报告是否适合用卡片呈现。""" + if not _OPERATIONAL_CARD_ENABLED: + return False + text = str(text or "").strip() + if not text: + return False + return bool(_OPERATIONAL_RE.search(text)) + + +def derive_operational_card_title(text: str, fallback: str = "状态汇报") -> str: + text = str(text or "") + if re.search(r"重启|已启动|连接已恢复", text, re.IGNORECASE): + return "重启汇报" + if re.search(r"验证|测试|pytest|passed|score\s*\d+|findings", text, re.IGNORECASE): + return "验证结果" + if re.search(r"push|已同步|origin/main|HEAD|commit|提交|工作区干净", text, re.IGNORECASE): + return "同步汇报" + if re.search(r"报错|失败", text, re.IGNORECASE): + return "异常汇报" + return fallback + + +def _text_node(text: str, *, bold: bool = False) -> dict[str, Any]: + node: dict[str, Any] = {"tag": "text", "text": text} + if bold: + node["style"] = ["bold"] + return node + + +def _inline_nodes(line: str, *, bold_line: bool = False) -> list[dict[str, Any]]: + nodes: list[dict[str, Any]] = [] + pos = 0 + for match in _BOLD_RE.finditer(line): + if match.start() > pos: + nodes.append(_text_node(line[pos:match.start()], bold=bold_line)) + nodes.append(_text_node(match.group(1), bold=True)) + pos = match.end() + if pos < len(line): + nodes.append(_text_node(line[pos:], bold=bold_line)) + return nodes or [_text_node(line or " ")] + + +def _line_nodes(line: str, *, in_code: bool = False) -> list[dict[str, Any]]: + line = str(line or "") + if len(line) > MAX_ROW_CHARS: + line = line[:MAX_ROW_CHARS].rstrip() + "..." + if in_code: + return [_text_node(line or " ")] + heading = _HEADING_RE.match(line) + if heading: + return _inline_nodes(_strip_markdown(heading.group(1)), bold_line=True) + return _inline_nodes(line) + + +def _content_rows(text: str) -> list[list[dict[str, Any]]]: + rows: list[list[dict[str, Any]]] = [] + in_code = False + blank_pending = False + for raw_line in str(text or "").splitlines(): + line = raw_line.rstrip() + if line.strip().startswith("```"): + in_code = not in_code + continue + if not line.strip(): + blank_pending = bool(rows) + continue + if blank_pending and len(rows) < MAX_POST_ROWS - 1: + rows.append([_text_node(" ")]) + blank_pending = False + rows.append(_line_nodes(line, in_code=in_code)) + if len(rows) >= MAX_POST_ROWS: + rows.append([_text_node("...(内容较长,已截断)")]) + break + return rows or [[_text_node("(无内容)")]] + + +def build_post_payload(text: str, *, title: str | None = None) -> str: + """把普通文本或轻量 Markdown 转成飞书 post 消息。""" + text = str(text or "").strip() + post_title = title or derive_post_title(text) + lines = text.splitlines() + if lines and _strip_markdown(lines[0]) == post_title: + text = "\n".join(lines[1:]).strip() + payload = { + "zh_cn": { + "title": post_title, + "content": _content_rows(text), + } + } + return json.dumps(payload, ensure_ascii=False) diff --git a/frontends/feishu_task_stream.py b/frontends/feishu_task_stream.py new file mode 100644 index 000000000..0a9c51b29 --- /dev/null +++ b/frontends/feishu_task_stream.py @@ -0,0 +1,635 @@ +import json +import os +import re +import threading +import time + +from frontends.feishu_cards import build_progress_card, build_status_card, build_task_workspace_card +from frontends.feishu_post import ( + build_post_payload, + derive_operational_card_title, + derive_post_title, + should_send_operational_card, + should_send_post, +) + + +WORKSPACE_CARD_ENABLED = os.environ.get("GA_FEISHU_TASK_WORKSPACE_CARD", "1").lower() not in { + "0", + "false", + "no", + "off", +} +WORKSPACE_MAX_STEPS = int(os.environ.get("GA_FEISHU_WORKSPACE_MAX_STEPS", "8") or "8") +WORKSPACE_FINAL_CHARS = int(os.environ.get("GA_FEISHU_WORKSPACE_FINAL_CHARS", "8500") or "8500") +WORKSPACE_PROGRESS_AFTER_TURNS = int(os.environ.get("GA_FEISHU_WORKSPACE_PROGRESS_AFTER_TURNS", "2") or "2") +WORKSPACE_PROGRESS_AFTER_SEC = float(os.environ.get("GA_FEISHU_WORKSPACE_PROGRESS_AFTER_SEC", "8") or "8") + +_CASUAL_FINAL_MAX_CHARS = int(os.environ.get("GA_FEISHU_CASUAL_FINAL_MAX_CHARS", "180") or "180") +_STRUCTURED_FINAL_RE = re.compile( + r"(^|\n)\s*(#{1,6}\s+|[-*]\s+|\d+[.、]\s+|```|>\s+)|" + r"(任务\s*\d+\s*[::]|Tool Calls|Outputs?|结论|报告|汇报|状态|PID|文件位置|使用方法)", + re.IGNORECASE, +) + + +def split_message(text, limit=7000): + text = str(text or "") + if len(text) <= limit: + return [text] + chunks = [] + remaining = text + while len(remaining) > limit: + split_at = remaining.rfind("\n", 0, limit) + if split_at < limit // 2: + split_at = remaining.rfind(" ", 0, limit) + if split_at < limit // 2: + split_at = limit + chunks.append(remaining[:split_at].rstrip()) + remaining = remaining[split_at:].lstrip() + if remaining: + chunks.append(remaining) + return chunks + + +def natural_group_final(text): + """去掉群聊结论里偏机器感的标题壳。""" + text = str(text or "").strip() + text = re.sub(r"^\s*(?:\*\*)?(?:✅\s*)?(?:结论|最终结论|完成|已完成)(?:\*\*)?\s*[::]?\s*", "", text, flags=re.IGNORECASE) + text = re.sub(r"^\s*[-=]{3,}\s*", "", text) + return text.strip() or "_(无文本输出)_" + + +def should_send_plain_final(text, *, turn_count=0, group_compact=False): + """判断最终回复是否应该保持普通文本气泡。""" + raw = str(text or "").strip() + visible = natural_group_final(raw) + if not visible or len(visible) > _CASUAL_FINAL_MAX_CHARS: + return False + if turn_count: + return False + if should_send_operational_card(raw) or should_send_post(raw): + return False + if _STRUCTURED_FINAL_RE.search(raw): + return False + if group_compact: + return True + # 单聊里的短回复也不要强行卡片化,这样更像正常对话。 + return "\n" not in visible + + +def fmt_tool_call(tc, max_args=240): + name = tc.get("tool_name", "?") + args = {k: v for k, v in (tc.get("args") or {}).items() if not str(k).startswith("_")} + arg_text = json.dumps(args, ensure_ascii=False, default=str) + if len(arg_text) > max_args: + arg_text = arg_text[:max_args] + "..." + return f"- `{name}`({arg_text})" + + +def _clip_line(text, *, limit=520): + text = re.sub(r"\s+", " ", str(text or "")).strip() + if len(text) > limit: + return text[: limit - 3].rstrip() + "..." + return text + + +def _path_tail(path): + text = str(path or "").strip() + if not text: + return "" + parts = re.split(r"[/\\]", text) + return parts[-1] or text + + +def _human_tool_action(tc): + name = str(tc.get("tool_name") or "?") + args = tc.get("args") or {} + if name in {"file_read", "read_file"}: + return f"读取 `{_path_tail(args.get('path') or args.get('file_path')) or '文件'}`" + if name in {"file_patch", "apply_patch", "edit_file"}: + return f"更新 `{_path_tail(args.get('path') or args.get('file_path')) or '文件'}`" + if name in {"file_write", "write_file"}: + return f"写入 `{_path_tail(args.get('path') or args.get('file_path')) or '文件'}`" + if name in {"code_run", "execute_code", "run_command"}: + script = args.get("script") or args.get("code") or args.get("cmd") or "" + return f"运行命令/脚本:{_clip_line(script, limit=120)}" + if name == "delegate_task": + tasks = args.get("tasks") or [] + return f"并行调研 {len(tasks)} 个子任务" if isinstance(tasks, list) else "启动并行调研" + if name in {"learning_pipeline", "learning_asset_update"}: + return "沉淀学习资产" + if name in {"cognitive_dream", "cognitive_store", "cognitive_retrieval"}: + return "读取或更新认知记忆" + return f"调用 `{name}`" + + +def _extract_result_text(value): + if value is None: + return "" + if isinstance(value, str): + return value + if isinstance(value, dict): + for key in ("summary", "msg", "message", "output", "stdout", "stderr", "error", "content", "final", "result"): + item = value.get(key) + if item: + if isinstance(item, (dict, list)): + return json.dumps(item, ensure_ascii=False, default=str) + return str(item) + status = value.get("status") + if status: + return f"状态={status}" + return json.dumps(value, ensure_ascii=False, default=str) + if isinstance(value, list): + if not value: + return "" + snippets = [_extract_result_text(item) for item in value[:3]] + return ";".join(s for s in snippets if s) + return str(value) + + +def _human_tool_result(result): + text = _extract_result_text(result) + text = re.sub(r"[\s\S]*?", "", text, flags=re.IGNORECASE).strip() + text = re.sub(r"```[\s\S]{900,}?```", "[长代码块已省略]", text) + text = re.sub(r"\{\\?\"[^\\n]{900,}", "[结构化输出已省略]", text) + lines = [line.strip() for line in text.splitlines() if line.strip()] + if not lines: + return "" + useful = [] + for line in lines: + if line.startswith("[Info] Final response"): + continue + if line.startswith("[Action]") or line.startswith("[Status]") or line.startswith("[Warn]"): + useful.append(_clip_line(line, limit=360)) + continue + if len(line) < 220 or any(token in line for token in ("已", "完成", "失败", "错误", "保存", "路径", "写入", "读取")): + useful.append(_clip_line(line, limit=360)) + if len(useful) >= 3: + break + return "\n".join(f"- {item}" for item in useful[:3]) + + +def _strip_list_prefix(text): + return re.sub(r"^\s*[-*]\s*", "", str(text or "")).strip() + + +def build_visible_self_talk(summary, tool_calls=None, tool_results=None, content=""): + """生成给用户看的单轮小结。 + + 这不是原始思维链,而是说明本轮做了什么、留下了什么证据、下一步怎么走。 + """ + summary = _clip_line(summary, limit=180) + actions = [_human_tool_action(tc) for tc in (tool_calls or [])[:2]] + result_lines = [] + for result in tool_results or []: + text = _human_tool_result(result) + if not text: + continue + for line in text.splitlines(): + clean = _strip_list_prefix(line) + if clean: + result_lines.append(clean) + if len(result_lines) >= 2: + break + if len(result_lines) >= 2: + break + visible_content = _clip_line(content, limit=220) + + finished = summary or "整理当前信息" + result = "" + if actions: + finished = ";".join(actions) + + if result_lines: + result = ";".join(result_lines) + elif visible_content and visible_content != "...": + result = visible_content + else: + result = "本轮暂无可见产物,已更新阶段判断。" + + if actions: + next_step = "基于这些结果继续推进下一轮。" + elif visible_content and visible_content != "...": + next_step = "等待用户反馈或按当前输出继续展开。" + else: + next_step = "继续补齐证据后再给结论。" + + lines = [ + f"- 本轮完成:{finished}", + f"- 当前结果:{result}", + f"- 下一步:{next_step}", + ] + return "### 小结\n" + "\n".join(lines) + + +def humanize_step_summary(summary, tool_calls=None): + summary = re.sub(r"\s+", " ", str(summary or "")).strip() + match = re.match(r"调用工具\s*([A-Za-z_][\w.-]*)\s*,?\s*args:\s*(.+)$", summary) + if match: + name = match.group(1) + args_text = match.group(2) + if tool_calls: + return _human_tool_action(tool_calls[0]) + if name in {"file_read", "read_file"}: + path_match = re.search(r"'path': '([^']+)'|\"path\": \"([^\"]+)\"", args_text) + return f"读取 `{_path_tail((path_match.group(1) or path_match.group(2)) if path_match else '') or '文件'}`" + if name in {"file_patch", "apply_patch", "edit_file"}: + path_match = re.search(r"'path': '([^']+)'|\"path\": \"([^\"]+)\"", args_text) + return f"更新 `{_path_tail((path_match.group(1) or path_match.group(2)) if path_match else '') or '文件'}`" + if name == "delegate_task": + return "并行调研多个子任务" + return f"调用 `{name}`" + return _clip_line(summary, limit=120) + + +def build_step_detail(resp, tool_calls, display_text, *, tool_results=None, include_raw_thinking=False, detail_limit=6000): + """生成一轮可见进展。 + + 默认不展示原始思维链。可见的“思考流”只保留操作痕迹:本轮摘要、动作、产物和用户可见输出。 + """ + parts = [] + thinking = (getattr(resp, "thinking", "") or "").strip() if resp else "" + content = display_text((getattr(resp, "content", "") or "")).strip() if resp else "" + turn_title = humanize_step_summary("", tool_calls) if tool_calls else "" + turn_summary = build_visible_self_talk( + turn_title, + tool_calls, + tool_results=tool_results, + content=content, + ) + if include_raw_thinking and thinking: + parts.append(f"### 原始思考\n{thinking}") + if turn_title: + parts.append(f"### 本轮完成\n{turn_title}") + if tool_calls: + raw_trace = os.environ.get("GA_FEISHU_RAW_TOOL_TRACE", "").lower() in {"1", "true", "yes"} + if raw_trace: + parts.append("### 执行动作\n" + "\n".join(fmt_tool_call(tc) for tc in tool_calls)) + else: + lines = [] + for tc in tool_calls: + name = str(tc.get("tool_name") or "?") + lines.append(f"- `{name}`: {_human_tool_action(tc)}") + parts.append("### 执行动作\n" + "\n".join(lines)) + result_lines = [] + for result in tool_results or []: + line = _human_tool_result(result) + if line: + result_lines.append(line) + if len(result_lines) >= 3: + break + if result_lines: + parts.append("### 产物/证据\n" + "\n".join(result_lines)) + if content and content != "...": + parts.append(f"### 可见输出\n{content}") + # 小结是本轮收口,放在动作、产物和可见输出之后更自然。 + parts.append(turn_summary) + detail = "\n\n".join(parts).strip() or "_(无可见输出)_" + if len(detail) > detail_limit: + detail = detail[:detail_limit] + f"\n\n...(已截断, 共 {len(detail)} 字符)" + return detail + + +class FeishuTaskStream: + """飞书长任务流。 + + 长任务应该像一个持续更新的工作台,而不是散落一地的进度消息。 + 默认只更新同一张卡片,每轮过程折叠;短群聊回复仍然走自然文本。 + """ + + def __init__( + self, + receive_id, + rid_type, + *, + send_raw, + patch_card, + display_text, + detail_limit=6000, + final_chunk_limit=7000, + status_tail=5, + quiet=False, + group_compact=False, + group_progress_after_sec=12, + send_initial_status=True, + reply_to=None, + send_reply=None, + workspace_card=WORKSPACE_CARD_ENABLED, + workspace_max_steps=WORKSPACE_MAX_STEPS, + workspace_final_chars=WORKSPACE_FINAL_CHARS, + workspace_progress_after_turns=WORKSPACE_PROGRESS_AFTER_TURNS, + workspace_progress_after_sec=None, + ): + self.rid = receive_id + self.rtype = rid_type + self._send_raw = send_raw + self._patch_card = patch_card + self._display_text = display_text + self.detail_limit = detail_limit + self.final_chunk_limit = final_chunk_limit + self.status_tail = status_tail + self.status = "🤔 思考中..." + self.msg_id = None + self.started_at = time.time() + self.turn_count = 0 + self.step_summaries = [] + self.final_message_ids = [] + self.quiet = quiet + self.group_compact = group_compact + self.group_progress_after_sec = group_progress_after_sec + self.send_initial_status = send_initial_status + self.reply_to = reply_to + self._send_reply = send_reply + self.workspace_card = bool(workspace_card and not quiet) + self.workspace_max_steps = workspace_max_steps + self.workspace_final_chars = workspace_final_chars + self.workspace_progress_after_turns = max(1, int(workspace_progress_after_turns or 1)) + if workspace_progress_after_sec is None: + workspace_progress_after_sec = group_progress_after_sec if group_compact else WORKSPACE_PROGRESS_AFTER_SEC + self.workspace_progress_after_sec = max(0.0, float(workspace_progress_after_sec or 0.0)) + self.steps = [] + self._lock = threading.RLock() + self._terminal = False + + def _send_message(self, payload, msg_type="interactive"): + if self.reply_to and self._send_reply: + return self._send_reply(self.reply_to, payload, msg_type) + return self._send_raw(self.rid, payload, msg_type, self.rtype) + + def _send_text_fallback(self, text): + msg_id = self._send_message( + json.dumps({"text": str(text or "_(无文本输出)_")}, ensure_ascii=False), + "text", + ) + if msg_id: + self.final_message_ids.append(msg_id) + return msg_id + + def _send_plain_chunks(self, text, *, limit=3500): + sent = 0 + for chunk in split_message(str(text or "_(无文本输出)_"), min(self.final_chunk_limit, limit)): + msg_id = self._send_message( + json.dumps({"text": chunk}, ensure_ascii=False), + "text", + ) + if msg_id: + self.final_message_ids.append(msg_id) + sent += 1 + return sent + + def _send_post_chunks(self, text, *, title="结论"): + sent = 0 + chunks = split_message(str(text or "_(无文本输出)_"), self.final_chunk_limit) + total = len(chunks) + for idx, chunk in enumerate(chunks, 1): + chunk_title = title if total == 1 else f"{title} ({idx}/{total})" + msg_id = self._send_message(build_post_payload(chunk, title=chunk_title), "post") + if msg_id: + self.final_message_ids.append(msg_id) + sent += 1 + return sent + + def _final_should_post(self, text): + return should_send_operational_card(text) or should_send_post(text) + + def _final_post_title(self, text, *, fallback="结论"): + if should_send_operational_card(text): + return derive_operational_card_title(text) + return derive_post_title(text, fallback=fallback) + + def _status_card(self): + elapsed = int(time.time() - self.started_at) + return build_status_card( + self.status, + elapsed=elapsed, + turn_count=self.turn_count, + step_summaries=self.step_summaries[-self.status_tail:], + ) + + def _push_status(self): + payload = self._status_card() + if self.msg_id: + ok = self._patch_card(self.msg_id, payload) + if ok: + return + self.msg_id = self._send_raw(self.rid, payload, "interactive", self.rtype) + + def _workspace_card(self, final_text=""): + elapsed = int(time.time() - self.started_at) + return build_task_workspace_card( + status=self.status, + steps=self.steps, + final_text=final_text, + elapsed=elapsed, + turn_count=self.turn_count, + max_steps=self.workspace_max_steps, + ) + + def _push_workspace(self, final_text=""): + payload = self._workspace_card(final_text=final_text) + if self.msg_id: + ok = self._patch_card(self.msg_id, payload) + if ok: + return self.msg_id + self.msg_id = self._send_message(payload, "interactive") + return self.msg_id + + def _should_show_workspace(self): + if self.msg_id: + return True + if self.turn_count >= self.workspace_progress_after_turns: + return True + return (time.time() - self.started_at) >= self.workspace_progress_after_sec + + def start(self): + with self._lock: + if self._terminal or self.quiet or self.group_compact or not self.send_initial_status: + return + if self.workspace_card: + self._push_workspace() + else: + self._push_status() + + def pulse(self, status): + with self._lock: + if self._terminal or self.quiet: + return + self.status = status + if self.workspace_card: + self._push_workspace() + elif not self.group_compact: + self._push_status() + + def step(self, summary, detail=""): + with self._lock: + if self._terminal: + return + self.turn_count += 1 + summary = re.sub(r"\s+", " ", str(summary or f"第 {self.turn_count} 轮")).strip() + if len(summary) > 120: + summary = summary[:117] + "..." + self.step_summaries.append((self.turn_count, summary)) + self.status = f"⏳ 工作中 · 第 {self.turn_count} 轮" + self.steps.append((self.turn_count, summary, str(detail or "_(无输出)_"))) + + if self.quiet: + # 群聊里先安静工作,最后给结论,避免刷屏。 + return + if self.workspace_card: + if self._should_show_workspace(): + self._push_workspace() + return + if self.group_compact: + elapsed = time.time() - self.started_at + if elapsed < self.group_progress_after_sec and self.turn_count <= 1: + return + detail = str(detail or "") + tool_lines = [] + for line in detail.splitlines(): + if line.strip().startswith("- `"): + tool_lines.append(line.strip()) + if len(tool_lines) >= 3: + break + lines = [f"**进展 · 第 {self.turn_count} 轮**", summary] + if tool_lines: + lines.append("") + lines.extend(tool_lines) + content = "\n".join(lines) + for chunk in split_message(content, min(self.final_chunk_limit, 3500)): + self._send_message( + build_progress_card(self.turn_count, summary, chunk, compact=True), + "interactive", + ) + return + + # 单聊完整模式 + detail = str(detail or "_(无输出)_") + if len(detail) > self.detail_limit: + detail = detail[:self.detail_limit] + f"\n\n...(已截断, 共 {len(detail)} 字符)" + content = f"**摘要**: {summary}\n\n{detail}" + for chunk in split_message(content, self.final_chunk_limit): + self._send_message( + build_progress_card(self.turn_count, summary, chunk), + "interactive", + ) + self._push_status() + + def done(self, text): + with self._lock: + self._terminal = True + visible = self._display_text(text) + if visible == "...": + visible = "_(无文本输出)_" + print( + f"[feishu-task-stream] final: chars={len(visible)} turns={self.turn_count} " + f"group_compact={self.group_compact} workspace={self.workspace_card}", + flush=True, + ) + if should_send_plain_final(visible, turn_count=self.turn_count, group_compact=self.group_compact): + msg_id = self._send_message( + json.dumps({"text": natural_group_final(visible)}, ensure_ascii=False), + "text", + ) + if msg_id: + self.final_message_ids.append(msg_id) + else: + self._send_text_fallback(natural_group_final(visible)) + self.status = "✅ 已完成" + return + if self.workspace_card and self.msg_id: + visible = natural_group_final(visible) if self.group_compact else visible + if len(visible) <= self.workspace_final_chars: + self.status = "✅ 已完成" + final_should_post = self._final_should_post(visible) + final_text = "结论已通过富文本消息发送。" if final_should_post else visible + if not self._push_workspace(final_text=final_text) and not self.group_compact: + self._send_text_fallback(visible) + if final_should_post: + if not self._send_post_chunks(visible, title=self._final_post_title(visible)): + self._send_text_fallback(visible) + elif self.group_compact: + self._send_plain_chunks(visible) + if self.group_compact: + self.status = "✅ 已完成" + return + self.status = "✅ 已完成 · 结论另发" + if not self._push_workspace(final_text="结论内容较长,已拆成后续消息发送。"): + print("[feishu-task-stream] workspace card send failed before final chunks", flush=True) + if self.group_compact: + if not self._send_post_chunks(visible, title=self._final_post_title(visible)): + self._send_text_fallback(visible) + return + if not self._send_post_chunks(visible, title=self._final_post_title(visible)): + self._send_text_fallback(visible) + return + if self.group_compact: + visible = natural_group_final(visible) + operational_card = should_send_operational_card(visible) + use_post = operational_card or should_send_post(visible) + chunks = split_message(visible, self.final_chunk_limit if use_post else min(self.final_chunk_limit, 3500)) + card_title = ( + derive_operational_card_title(visible) + if operational_card + else derive_post_title(visible, fallback="已完成") + ) + total = len(chunks) + for idx, chunk in enumerate(chunks, 1): + if use_post: + title = card_title if total == 1 else f"{card_title} ({idx}/{total})" + msg_id = self._send_message( + build_post_payload(chunk, title=title), + "post", + ) + if msg_id: + self.final_message_ids.append(msg_id) + continue + msg_id = self._send_message( + json.dumps({"text": chunk}, ensure_ascii=False), + "text", + ) + if msg_id: + self.final_message_ids.append(msg_id) + if not self.final_message_ids: + self._send_text_fallback(visible) + self.status = "✅ 已完成" + return + title = "结论" if self.quiet else self._final_post_title(visible, fallback="已完成") + self._send_post_chunks(visible, title=title) + if not self.final_message_ids: + self._send_text_fallback(visible) + self.status = "✅ 已完成 · 结论已发送" + if not self.quiet: + self._push_status() + + def fail(self, msg): + with self._lock: + self._terminal = True + if self.workspace_card and self.msg_id: + self.status = f"❌ {msg}" + self._push_workspace() + return + if self.quiet or self.group_compact: + self._send_message( + json.dumps({"text": f"出错了:{msg}"}, ensure_ascii=False), + "text", + ) + return + self.status = f"❌ {msg}" + self._push_status() + + def cancel(self, msg="已停止当前任务"): + with self._lock: + self._terminal = True + if self.workspace_card and self.msg_id: + self.status = f"⏹️ {msg}" + self._push_workspace() + return + if self.quiet or self.group_compact: + self._send_message( + json.dumps({"text": str(msg or "已停止当前任务")}, ensure_ascii=False), + "text", + ) + return + self.status = f"⏹️ {msg}" + self._push_status() diff --git a/frontends/fsapp.py b/frontends/fsapp.py index ad959f660..ef3d3d869 100644 --- a/frontends/fsapp.py +++ b/frontends/fsapp.py @@ -80,6 +80,11 @@ def _ensure_runtime_paths(): _ensure_runtime_paths() from agentmain import GeneraticAgent from frontends.chatapp_common import AgentChatMixin, FILE_HINT, split_text +from frontends.feishu_task_stream import ( + FeishuTaskStream, + build_step_detail as build_stream_step_detail, + humanize_step_summary, +) _TAG_PATS = [r"<" + t + r">.*?" for t in ("thinking", "summary", "tool_use", "file_content")] _IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".ico", ".tiff", ".tif"} @@ -619,91 +624,12 @@ def _fmt_tool_call(tc): def _build_step_detail(resp, tool_calls): - """从 LLM response + tool_calls 组装单步展开详情(纯函数)。""" - parts = [] - thinking = (getattr(resp, 'thinking', '') or '').strip() if resp else '' - if thinking: - parts.append(f"### 💭 Thinking\n{thinking}") - if tool_calls: - parts.append("### 🛠 Tool Calls\n" + "\n".join(_fmt_tool_call(tc) for tc in tool_calls)) - content = _display_text((getattr(resp, 'content', '') or '')).strip() if resp else '' - if content and content != '...': - parts.append(f"### 📝 Output\n{content}") - return "\n\n".join(parts) - - -class _TaskCard: - """飞书任务卡片:单卡片持续 patch;每步一个独立折叠面板(header 显示 summary,展开看详情)。""" - _DETAIL_LIMIT = 8000 - - def __init__(self, receive_id, rid_type): - self.rid, self.rtype = receive_id, rid_type - self.steps = [] # [(summary, detail), ...] - self.status = "🤔 思考中..." - self.final = None - self.msg_id = None - self.start_fallback_sent = False - self.final_fallback_sent = False - - def _step_panel(self, idx, summary, detail): - detail = detail or "_(无输出)_" - if len(detail) > self._DETAIL_LIMIT: - detail = detail[:self._DETAIL_LIMIT] + f"\n\n…(已截断,共 {len(detail)} 字符)" - return { - "tag": "collapsible_panel", "expanded": False, - "header": {"title": {"tag": "plain_text", "content": f"Turn {idx} · {summary}"}}, - "elements": [{"tag": "markdown", "content": detail}], - } - - def _build(self): - els = [{"tag": "markdown", "content": f"**{self.status}**"}] - for i, (s, d) in enumerate(self.steps, 1): - els.append(self._step_panel(i, s, d)) - if self.final: - els += [{"tag": "hr"}, {"tag": "markdown", "content": self.final}] - return _card_raw(els) - - def _push(self): - card = self._build() - if self.msg_id: - ok = _patch_card(self.msg_id, card) - else: - self.msg_id = _send_raw(self.rid, card, "interactive", self.rtype) - ok = bool(self.msg_id) - return ok - - def _fallback_text(self, text, *, final=False): - attr = "final_fallback_sent" if final else "start_fallback_sent" - if getattr(self, attr): - return - setattr(self, attr, True) - send_message(self.rid, text, receive_id_type=self.rtype) - - # ── 公开接口 ── - - def start(self): - if not self._push(): - self._fallback_text("🤔 思考中...") - - def step(self, summary, detail=""): - self.steps.append((summary, detail)) - self.status = f"⏳ 工作中 · Turn {len(self.steps)}" - self._push() - - def done(self, text): - self.status = "✅ 已完成" - self.final = text or "_(无文本输出)_" - if not self._push(): - self._fallback_text(_display_text(text), final=True) - - def fail(self, msg): - self.status = f"❌ {msg}" - if not self._push(): - self._fallback_text(f"❌ {msg}", final=True) + """从 LLM response + tool_calls 组装用户可读的单步详情。""" + return build_stream_step_detail(resp, tool_calls or [], _display_text) -def _make_task_hook(card, task_id, on_final): - """飞书任务 hook:每轮 patch 卡片状态;结束触发 on_final(raw) 处理附件。""" +def _make_task_hook(task_stream, task_id, on_final): + """飞书任务 hook:每轮更新同一个任务工作台;结束后处理附件。""" def hook(ctx): try: parent = getattr(ctx.get("self"), "parent", None) @@ -714,8 +640,15 @@ def hook(ctx): raw = resp.content if hasattr(resp, 'content') else str(resp) on_final(raw) elif ctx.get('summary'): - detail = _build_step_detail(ctx.get('response'), ctx.get('tool_calls') or []) - card.step(ctx['summary'], detail) + tool_calls = ctx.get('tool_calls') or [] + detail = build_stream_step_detail( + ctx.get('response'), + tool_calls, + _display_text, + tool_results=ctx.get('tool_results') or [], + ) + summary = humanize_step_summary(ctx.get('summary') or "", tool_calls) + task_stream.step(summary, detail) except Exception as e: print(f"[fs hook] error: {e}") return hook @@ -744,7 +677,15 @@ async def run_agent(self, chat_id, text, *, receive_id=None, receive_id_type="op rid = receive_id or chat_id task_id = f"{chat_id}_{uuid.uuid4().hex}" hook_key = f"fs_{task_id}" - card = _TaskCard(rid, receive_id_type) + task_stream = FeishuTaskStream( + rid, + receive_id_type, + send_raw=_send_raw, + patch_card=_patch_card, + display_text=_display_text, + group_compact=(receive_id_type == "chat_id"), + send_initial_status=False, + ) result = {"raw": None, "sent": False} finish_lock = threading.Lock() @@ -754,17 +695,18 @@ def _finish(raw): return result["raw"] = raw result["sent"] = True - card.done(_display_text(raw)) + task_stream.done(raw) _send_generated_files(rid, raw, receive_id_type=receive_id_type) try: - await asyncio.to_thread(card.start) + await asyncio.to_thread(task_stream.start) if not hasattr(self.agent, '_turn_end_hooks'): self.agent._turn_end_hooks = {} - self.agent._turn_end_hooks[hook_key] = _make_task_hook(card, task_id, _finish) + self.agent._turn_end_hooks[hook_key] = _make_task_hook(task_stream, task_id, _finish) self.agent._fs_active_task_id = task_id dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source, images=images or None) start = time.time() + last_pulse = 0 while state["running"] and not result["sent"]: try: item = await asyncio.to_thread(dq.get, True, 1) @@ -773,16 +715,21 @@ def _finish(raw): if item and "done" in item: await asyncio.to_thread(_finish, item.get("done", "")) break + if item and item.get("next"): + elapsed = int(time.time() - start) + if elapsed - last_pulse >= 15: + await asyncio.to_thread(task_stream.pulse, f"⏳ 工作中 · {elapsed}s") + last_pulse = elapsed if time.time() - start > AGENT_TIMEOUT_SEC: self.agent.abort() - await asyncio.to_thread(card.fail, "任务超时") + await asyncio.to_thread(task_stream.fail, "任务超时,已停止当前任务") break if not state["running"] and not result["sent"]: self.agent.abort() - await asyncio.to_thread(card.fail, "已停止") + await asyncio.to_thread(task_stream.cancel, "已停止当前任务") except Exception as e: traceback.print_exc() - await asyncio.to_thread(card.fail, f"错误: {e}") + await asyncio.to_thread(task_stream.fail, f"错误: {e}") finally: if getattr(self.agent, "_fs_active_task_id", None) == task_id: try: diff --git a/tests/test_feishu_post.py b/tests/test_feishu_post.py new file mode 100644 index 000000000..f7e3f7bb9 --- /dev/null +++ b/tests/test_feishu_post.py @@ -0,0 +1,39 @@ +import json + +from frontends.feishu_post import build_post_payload, derive_post_title, should_send_post + + +def test_short_casual_reply_stays_text(): + assert not should_send_post("可以") + assert not should_send_post("收到,我来处理。") + + +def test_structured_report_uses_post(): + report = "\n".join([ + "Dream 认知精炼报告 | 2026-05-15", + "", + "旁路复盘: 1 条", + "1. [L2] 飞书对话: 当天群聊应进入强记忆。", + "", + "证据记录: 8 条", + "反馈记录: 2 条", + ]) + + assert should_send_post(report) + + +def test_build_post_payload_preserves_title_and_structure(): + payload = json.loads(build_post_payload("**结论**\n\n- 已接入 Post\n- 短句仍发 text")) + + assert payload["zh_cn"]["title"] == "结论" + rows = payload["zh_cn"]["content"] + flat = json.dumps(rows, ensure_ascii=False) + assert "已接入 Post" in flat + assert "短句仍发 text" in flat + assert rows[0][0]["text"].startswith("- 已接入") + + +def test_derive_post_title_falls_back_for_long_first_line(): + long_line = "这是一段很长很长的普通回复" * 8 + + assert derive_post_title(long_line, fallback="GA 结论") == "GA 结论" diff --git a/tests/test_feishu_task_stream.py b/tests/test_feishu_task_stream.py new file mode 100644 index 000000000..ced54eaa3 --- /dev/null +++ b/tests/test_feishu_task_stream.py @@ -0,0 +1,570 @@ +import json + +from frontends.feishu_cards import build_final_card +from frontends.feishu_task_stream import ( + FeishuTaskStream, + build_step_detail, + humanize_step_summary, + natural_group_final, + should_send_plain_final, +) + + +class _Resp: + thinking = "private chain of thought" + content = "visible output" + + +def _display_text(text): + return text or "..." + + +def _payload_text(payload): + data = json.loads(payload) + parts = [] + + post = data.get("zh_cn") + if isinstance(post, dict): + title = post.get("title") + if isinstance(title, str): + parts.append(title) + for row in post.get("content", []) or []: + for node in row or []: + if isinstance(node, dict) and isinstance(node.get("text"), str): + parts.append(node["text"]) + return "\n".join(part for part in parts if part) + + def walk(element): + if not isinstance(element, dict): + return + header = element.get("header", {}) + if isinstance(header, dict): + title = header.get("title", {}) + if isinstance(title, dict): + parts.append(title.get("content", "")) + for key in ("content", "text"): + value = element.get(key) + if isinstance(value, str): + parts.append(value) + for child in element.get("elements", []) or []: + walk(child) + + header = data.get("header", {}) + title = header.get("title", {}) if isinstance(header, dict) else {} + if isinstance(title, dict): + parts.append(title.get("content", "")) + for element in data.get("body", {}).get("elements", []): + walk(element) + return "\n".join(part for part in parts if part) + + +def test_step_detail_hides_raw_thinking_by_default(): + detail = build_step_detail( + _Resp(), + [{"tool_name": "file_read", "args": {"path": "/tmp/a.txt"}}], + _display_text, + ) + + assert "private chain of thought" not in detail + assert "### 小结" in detail + assert "本轮完成:读取 `a.txt`" in detail + assert "执行动作" in detail + assert "读取 `a.txt`" in detail + assert "visible output" in detail + assert "{\"path\"" not in detail + + +def test_step_detail_places_summary_after_turn_outputs(): + detail = build_step_detail( + _Resp(), + [{"tool_name": "file_read", "args": {"path": "/tmp/a.txt"}}], + _display_text, + tool_results=[{"status": "success", "output": "已读取 a.txt"}], + ) + + assert detail.index("### 执行动作") < detail.index("### 产物/证据") + assert detail.index("### 产物/证据") < detail.index("### 可见输出") + assert detail.index("### 可见输出") < detail.index("### 小结") + + +def test_step_detail_summarizes_tool_results_without_raw_trace(): + detail = build_step_detail( + _Resp(), + [{"tool_name": "file_patch", "args": {"path": "/tmp/USER.md", "old_content": "x" * 1200}}], + _display_text, + tool_results=[{"status": "success", "output": "patched /tmp/USER.md\n当前持仓已写入本地记忆"}], + ) + + assert "更新 `USER.md`" in detail + assert "当前结果" in detail + assert "产物/证据" in detail + assert "当前持仓已写入本地记忆" in detail + assert "old_content" not in detail + + +def test_step_detail_surfaces_error_outputs(): + detail = build_step_detail( + _Resp(), + [{"tool_name": "code_run", "args": {"script": "python scanner.py"}}], + _display_text, + tool_results=[{"status": "error", "error": "KeyError: slice(None, 800, None)"}], + ) + + assert "产物/证据" in detail + assert "KeyError" in detail + assert "调用" not in detail + + +def test_humanize_raw_tool_summary(): + summary = "调用工具file_read, args: {'path': '/Users/me/memory/USER.md'}" + + assert humanize_step_summary(summary) == "读取 `USER.md`" + + +def test_task_stream_patches_one_workspace_card_for_progress_and_final(): + sent = [] + patched = [] + + def send_raw(receive_id, payload, msg_type, rid_type): + sent.append((receive_id, payload, msg_type, rid_type)) + return f"msg-{len(sent)}" + + def patch_card(message_id, payload): + patched.append((message_id, payload)) + return True + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=send_raw, + patch_card=patch_card, + display_text=_display_text, + ) + + stream.start() + stream.step("读取配置", "### Tool Calls\n- `file_read`({})") + stream.step("运行验证", "### Tool Calls\n- `code_run`({})") + stream.done("最终结论") + + assert len(sent) == 1 + assert "任务工作台" in _payload_text(sent[0][1]) + assert "第 1 轮 · 读取配置" in _payload_text(patched[0][1]) + assert "第 2 轮 · 运行验证" in _payload_text(patched[1][1]) + assert "已完成" in _payload_text(patched[-1][1]) + assert "最终输出" in _payload_text(patched[-1][1]) + assert "最终结论" in _payload_text(patched[-1][1]) + assert patched + + +def test_workspace_card_keeps_only_recent_steps(): + sent = [] + patched = [] + + def send_raw(receive_id, payload, msg_type, rid_type): + sent.append(payload) + return "status-msg" + + def patch_card(message_id, payload): + patched.append(payload) + return True + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=send_raw, + patch_card=patch_card, + display_text=_display_text, + workspace_max_steps=2, + ) + stream.start() + for idx in range(5): + stream.step(f"步骤 {idx}", "detail") + + latest_status = _payload_text(patched[-1]) + assert "步骤 0" not in latest_status + assert "步骤 1" not in latest_status + assert "步骤 3" in latest_status + assert "步骤 4" in latest_status + + +def test_workspace_structured_final_is_sent_as_post_not_embedded_in_card(): + sent = [] + patched = [] + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append((payload, msg_type)) or f"msg-{len(sent)}", + patch_card=lambda message_id, payload: patched.append(payload) or True, + display_text=_display_text, + ) + + stream.start() + stream.step("修改发送通道", "detail") + stream.done("改造已完成,汇报结果:\n\n**做了什么:**\n1. 改为 post\n2. 重启验证") + + assert sent[0][1] == "interactive" + assert sent[-1][1] == "post" + assert "**做了什么:**" not in _payload_text(patched[-1]) + assert "结论已通过富文本消息发送" in _payload_text(patched[-1]) + assert "做了什么:" in _payload_text(sent[-1][0]) + assert "改为 post" in _payload_text(sent[-1][0]) + + +def test_task_stream_can_skip_initial_status_until_second_step(): + sent = [] + patched = [] + + def send_raw(receive_id, payload, msg_type, rid_type): + sent.append((receive_id, payload, msg_type, rid_type)) + return f"msg-{len(sent)}" + + def patch_card(message_id, payload): + patched.append((message_id, payload)) + return True + + stream = FeishuTaskStream( + "open-1", + "open_id", + send_raw=send_raw, + patch_card=patch_card, + display_text=_display_text, + send_initial_status=False, + ) + + stream.start() + assert sent == [] + stream.step("读取配置", "detail") + assert sent == [] + stream.step("运行验证", "detail") + assert len(sent) == 1 + assert "第 1 轮 · 读取配置" in _payload_text(sent[0][1]) + assert "第 2 轮 · 运行验证" in _payload_text(sent[0][1]) + + +def test_quiet_task_stream_sends_only_final_conclusion(): + sent = [] + patched = [] + + def send_raw(receive_id, payload, msg_type, rid_type): + sent.append((receive_id, payload, msg_type, rid_type)) + return f"msg-{len(sent)}" + + def patch_card(message_id, payload): + patched.append((message_id, payload)) + return True + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=send_raw, + patch_card=patch_card, + display_text=_display_text, + quiet=True, + ) + + stream.start() + stream.pulse("working") + stream.step("读取配置", "### Tool Calls\n- `file_read`({})") + stream.done("最终结论") + + assert len(sent) == 1 + assert "Turn" not in _payload_text(sent[0][1]) + assert "读取配置" not in _payload_text(sent[0][1]) + assert "思考流" not in _payload_text(sent[0][1]) + assert "Tool Calls" not in _payload_text(sent[0][1]) + assert "最终结论" in _payload_text(sent[0][1]) + assert not patched + + +def test_group_compact_replies_with_progress_and_plain_final(): + sent = [] + replies = [] + patched = [] + + def send_raw(receive_id, payload, msg_type, rid_type): + sent.append((receive_id, payload, msg_type, rid_type)) + return f"msg-{len(sent)}" + + def send_reply(message_id, payload, msg_type): + replies.append((message_id, payload, msg_type)) + return f"reply-{len(replies)}" + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=send_raw, + patch_card=lambda message_id, payload: patched.append((message_id, payload)) or True, + display_text=_display_text, + group_compact=True, + group_progress_after_sec=0, + reply_to="om_123", + send_reply=send_reply, + ) + + stream.start() + stream.step("查 cron 配置", "### Tool Calls\n- `file_read`({})") + stream.done("**结论**\n\n收到,DAG 报告不会再发群里。") + + assert sent == [] + assert replies[0][0] == "om_123" + assert replies[0][2] == "interactive" + assert "第 1 轮 · 查 cron 配置" in _payload_text(replies[0][1]) + assert "收到,DAG 报告不会再发群里。" in _payload_text(patched[-1][1]) + assert replies[-1][2] == "text" + assert json.loads(replies[-1][1])["text"] == "收到,DAG 报告不会再发群里。" + + +def test_terminal_workspace_ignores_late_pulse_after_done(): + sent = [] + patched = [] + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append(payload) or "msg", + patch_card=lambda message_id, payload: patched.append(payload) or True, + display_text=_display_text, + group_compact=True, + workspace_progress_after_sec=0, + ) + + stream.step("读取配置", "detail") + stream.done("最终结论") + final_patch_count = len(patched) + stream.pulse("⏳ 工作中 · 999s") + stream.step("迟到进展", "detail") + + assert len(patched) == final_patch_count + assert "✅ 已完成" in _payload_text(patched[-1]) + assert "999s" not in _payload_text(patched[-1]) + + +def test_group_compact_replies_structured_final_as_post(): + replies = [] + + def send_reply(message_id, payload, msg_type): + replies.append((message_id, payload, msg_type)) + return f"reply-{len(replies)}" + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=lambda *args: "raw", + patch_card=lambda *_: True, + display_text=_display_text, + group_compact=True, + reply_to="om_123", + send_reply=send_reply, + ) + + stream.done( + "\n".join([ + "**结论**", + "", + "已完成清理:", + "1. 保留短句普通回复", + "2. 长报告自动改成卡片", + "3. Dream 报告默认走卡片", + ]) + ) + + assert replies[0][2] == "post" + payload = json.loads(replies[0][1]) + assert payload["zh_cn"]["title"] == "已完成清理:" + flat = json.dumps(payload["zh_cn"]["content"], ensure_ascii=False) + assert "长报告自动改成卡片" in flat + + +def test_group_compact_replies_operational_status_as_post(): + replies = [] + + def send_reply(message_id, payload, msg_type): + replies.append((message_id, payload, msg_type)) + return f"reply-{len(replies)}" + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=lambda *args: "raw", + patch_card=lambda *_: True, + display_text=_display_text, + group_compact=True, + reply_to="om_123", + send_reply=send_reply, + ) + + stream.done("状态稳定。Feishu 和 Weixin 在线,PID 97981。") + + assert replies[0][2] == "post" + payload = json.loads(replies[0][1]) + assert payload["zh_cn"]["title"] == "状态汇报" + assert "PID 97981" in _payload_text(replies[0][1]) + + +def test_group_compact_keeps_casual_short_reply_as_text(): + replies = [] + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=lambda *args: "raw", + patch_card=lambda *_: True, + display_text=_display_text, + group_compact=True, + reply_to="om_123", + send_reply=lambda message_id, payload, msg_type: replies.append((message_id, payload, msg_type)) or "reply", + ) + + stream.done("我在,直接说。") + + assert replies[0][2] == "text" + assert json.loads(replies[0][1])["text"] == "我在,直接说。" + + +def test_private_casual_reply_stays_text_not_card(): + sent = [] + + stream = FeishuTaskStream( + "open-1", + "open_id", + send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append((payload, msg_type)) or "msg", + patch_card=lambda *_: True, + display_text=_display_text, + ) + + stream.done("0.7 是个好数值,保持这个值挺好。") + + assert sent[0][1] == "text" + assert json.loads(sent[0][0])["text"] == "0.7 是个好数值,保持这个值挺好。" + + +def test_structured_private_reply_uses_post(): + sent = [] + + stream = FeishuTaskStream( + "open-1", + "open_id", + send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append((payload, msg_type)) or "msg", + patch_card=lambda *_: True, + display_text=_display_text, + ) + + stream.done("结论:已修复。\n\n1. 更新输出流\n2. 重启验证") + + assert sent[0][1] == "post" + assert "更新输出流" in _payload_text(sent[0][0]) + + +def test_plain_final_classifier_keeps_reports_as_cards(): + assert should_send_plain_final("我在,直接说。") + assert not should_send_plain_final("结论:已处理") + assert not should_send_plain_final("1. 先查日志\n2. 再重启") + + +def test_final_card_surfaces_task_titles_from_output(): + payload = build_final_card( + "\n".join([ + "🎉 任务 1 & 2 全部完成!", + "", + "✅ 任务 1:盘中监控预警脚本", + "已创建。", + "", + "✅ 任务 2:K 线图可视化", + "已生成。", + ]), + title="已完成", + ) + + text = _payload_text(payload) + assert "完成清单" in text + assert "任务 1:盘中监控预警脚本" in text + assert "任务 2:K 线图可视化" in text + + +def test_workspace_card_surfaces_task_plan_from_steps_and_outputs(): + sent = [] + patched = [] + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append(payload) or "msg", + patch_card=lambda message_id, payload: patched.append(payload) or True, + display_text=_display_text, + ) + + stream.step( + "生成盘中监控预警脚本", + build_step_detail( + _Resp(), + [{"tool_name": "file_write", "args": {"path": "/tmp/scanner_ma5_monitor.py"}}], + _display_text, + tool_results=[{"status": "success", "output": "已写入 scanner_ma5_monitor.py"}], + ), + ) + stream.step( + "绘制 K 线图", + build_step_detail( + _Resp(), + [{"tool_name": "code_run", "args": {"script": "python draw_kline.py"}}], + _display_text, + tool_results=[{"status": "success", "output": "已生成 kline_001259.png"}], + ), + ) + stream.done("任务 1:盘中监控预警脚本\n任务 2:K 线图可视化\n\n全部完成。") + + text = _payload_text(patched[-1]) + assert "完成清单" in text + assert "任务 1:盘中监控预警脚本" in text + assert "任务 2:K 线图可视化" in text + assert "执行动作" in text + assert "产物/证据" in text + assert "scanner_ma5_monitor.py" in text + + +def test_natural_group_final_strips_machine_heading(): + assert natural_group_final("**结论**\n\n搞定了") == "搞定了" + assert natural_group_final("✅ 最终结论:已处理") == "已处理" + + +def test_group_compact_delays_workspace_until_second_step(): + replies = [] + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=lambda *args: "raw", + patch_card=lambda *_: True, + display_text=_display_text, + group_compact=True, + reply_to="om_123", + send_reply=lambda message_id, payload, msg_type: replies.append((message_id, payload, msg_type)) or "reply", + ) + + stream.step("刚开始处理", "detail") + assert replies == [] + stream.step("继续验证", "detail") + + assert replies + assert "第 1 轮 · 刚开始处理" in _payload_text(replies[0][1]) + assert "第 2 轮 · 继续验证" in _payload_text(replies[0][1]) + + +def test_group_compact_cancel_sends_plain_stop_message(): + replies = [] + + stream = FeishuTaskStream( + "chat-1", + "chat_id", + send_raw=lambda *args: "raw", + patch_card=lambda *_: True, + display_text=_display_text, + group_compact=True, + reply_to="om_123", + send_reply=lambda message_id, payload, msg_type: replies.append((message_id, payload, msg_type)) or "reply", + ) + + stream.cancel("已停止当前任务") + + assert replies == [("om_123", json.dumps({"text": "已停止当前任务"}, ensure_ascii=False), "text")]