diff --git a/assets/demo/feishu_task_workbench_demo.svg b/assets/demo/feishu_task_workbench_demo.svg
new file mode 100644
index 000000000..7a591d37d
--- /dev/null
+++ b/assets/demo/feishu_task_workbench_demo.svg
@@ -0,0 +1,45 @@
+
diff --git a/frontends/feishu_cards.py b/frontends/feishu_cards.py
new file mode 100644
index 000000000..fee06d21c
--- /dev/null
+++ b/frontends/feishu_cards.py
@@ -0,0 +1,207 @@
+"""飞书交互卡片的展示层工具。
+
+这里故意只处理呈现,不改变 agent 的决策流程。目标是让长任务更好读。
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import re
+from typing import Iterable
+
+CARD_OUTPUT_ENABLED = os.environ.get("GA_FEISHU_CARD_OUTPUT", "1").lower() not in {"0", "false", "no", "off"}
+MAX_MARKDOWN_BLOCK_CHARS = int(os.environ.get("GA_FEISHU_CARD_BLOCK_CHARS", "5200") or "5200")
+MAX_WORKSPACE_DETAIL_CHARS = int(os.environ.get("GA_FEISHU_WORKSPACE_DETAIL_CHARS", "5200") or "5200")
+
+
+def markdown(content: str) -> dict[str, str]:
+ return {"tag": "markdown", "content": str(content or "")}
+
+
+def hr() -> dict[str, str]:
+ return {"tag": "hr"}
+
+
+def collapsible_panel(title: str, content: str, *, expanded: bool = False) -> dict:
+ title = _clean_summary(title, limit=120)
+ content = str(content or "_(无输出)_").strip() or "_(无输出)_"
+ if len(content) > MAX_WORKSPACE_DETAIL_CHARS:
+ content = content[:MAX_WORKSPACE_DETAIL_CHARS].rstrip() + f"\n\n...(已截断, 共 {len(content)} 字符)"
+ return {
+ "tag": "collapsible_panel",
+ "expanded": expanded,
+ "header": {"title": {"tag": "plain_text", "content": title}},
+ "elements": [markdown(content)],
+ }
+
+
+def card_raw(elements: list[dict], *, title: str = "", template: str = "blue") -> str:
+ card: dict = {
+ "schema": "2.0",
+ "config": {"streaming_mode": False, "width_mode": "fill"},
+ "body": {"elements": elements},
+ }
+ title = re.sub(r"\s+", " ", str(title or "")).strip()
+ if title:
+ card["header"] = {
+ "template": template,
+ "title": {"tag": "plain_text", "content": title[:80]},
+ }
+ return json.dumps(card, ensure_ascii=False)
+
+
+def split_markdown_blocks(text: str, *, limit: int = MAX_MARKDOWN_BLOCK_CHARS) -> list[str]:
+ text = str(text or "").strip()
+ if not text:
+ return ["_(无文本输出)_"]
+ if len(text) <= limit:
+ return [text]
+ chunks: list[str] = []
+ remaining = text
+ while len(remaining) > limit:
+ split_at = remaining.rfind("\n\n", 0, limit)
+ if split_at < limit // 2:
+ split_at = remaining.rfind("\n", 0, limit)
+ if split_at < limit // 2:
+ split_at = limit
+ chunks.append(remaining[:split_at].rstrip())
+ remaining = remaining[split_at:].lstrip()
+ if remaining:
+ chunks.append(remaining)
+ return chunks
+
+
+def _clean_summary(summary: str, *, limit: int = 90) -> str:
+ summary = re.sub(r"\s+", " ", str(summary or "")).strip()
+ if len(summary) > limit:
+ return summary[: limit - 3] + "..."
+ return summary or "继续处理"
+
+
+def _extract_task_titles(text: str, *, limit: int = 6) -> list[str]:
+ titles: list[str] = []
+ patterns = [
+ re.compile(r"^\s*(?:#{1,4}\s*)?(?:[🚀✅🎉📌💡🛠️📄📁📊]\s*)?(任务\s*\d+\s*[::].+?)\s*$", re.I),
+ re.compile(r"^\s*(?:#{1,4}\s*)?(Task\s*\d+\s*[::].+?)\s*$", re.I),
+ re.compile(r"^\s*(\d+)[.、]\s+(?:\*\*)?([^::\n]{4,80}?)(?:\*\*)?\s*[::]\s*(.+?)\s*$", re.I),
+ ]
+ for line in str(text or "").splitlines():
+ stripped = line.strip().strip("*")
+ for pattern in patterns:
+ match = pattern.match(stripped)
+ if match:
+ if len(match.groups()) >= 3 and match.group(1).isdigit():
+ title = f"任务 {match.group(1)}:{match.group(2).strip()}:{match.group(3).strip()}"
+ else:
+ title = re.sub(r"\s+", " ", match.group(1)).strip()
+ if title and title not in titles:
+ titles.append(title)
+ break
+ if len(titles) >= limit:
+ break
+ return titles
+
+
+def _task_title_from_summary(idx: int, summary: str) -> str:
+ summary = _clean_summary(summary, limit=70)
+ summary = re.sub(r"^任务\s*\d+\s*[::]\s*", "", summary)
+ summary = re.sub(r"^Turn\s*\d+\s*[::·-]\s*", "", summary, flags=re.I)
+ return f"任务 {idx}:{summary}"
+
+
+def _workflow_markdown(step_summaries: Iterable[tuple[int, str]] | None, final_text: str) -> str:
+ lines: list[str] = []
+ task_titles = _extract_task_titles(final_text)
+ if task_titles:
+ lines.append("### 完成清单")
+ lines.extend(f"{idx}. {title}" for idx, title in enumerate(task_titles, 1))
+ elif step_summaries:
+ lines.append("### 最近进展")
+ for order, (_turn, summary) in enumerate(list(step_summaries)[-6:], 1):
+ lines.append(f"{order}. {_task_title_from_summary(order, summary)}")
+ return "\n".join(lines).strip()
+
+
+def _has_output_heading(text: str) -> bool:
+ return bool(re.search(r"(?mi)^\s*#{1,6}\s*(?:Outputs?|结论|最终结论|已完成)\s*[::]?\s*$", str(text or "")))
+
+
+def build_status_card(status: str, *, elapsed: int = 0, turn_count: int = 0, step_summaries=None) -> str:
+ lines = [f"**{status or '工作中'}**", f"耗时: {elapsed}s"]
+ if turn_count:
+ lines.append(f"轮次: {turn_count}")
+ if step_summaries:
+ lines.append("")
+ lines.append("最近进展:")
+ for idx, summary in list(step_summaries):
+ lines.append(f"- 第 {idx} 轮:{_clean_summary(summary)}")
+ template = "green" if "完成" in str(status) else "blue"
+ return card_raw([markdown("\n".join(lines))], title=str(status or "工作中").replace("...", ""), template=template)
+
+
+def build_progress_card(turn: int, summary: str, detail: str = "", *, compact: bool = False) -> str:
+ summary = _clean_summary(summary, limit=120)
+ body = [f"**{summary}**"]
+ detail = str(detail or "").strip()
+ if detail:
+ body.append(detail)
+ title = f"进展 · 第 {turn} 轮"
+ return card_raw([markdown("\n\n".join(body))], title=title, template="blue")
+
+
+def build_task_workspace_card(
+ *,
+ status: str,
+ steps: Iterable[tuple[int, str, str]] | None = None,
+ final_text: str = "",
+ elapsed: int = 0,
+ turn_count: int = 0,
+ max_steps: int = 8,
+ title: str = "任务工作台",
+) -> str:
+ """生成一张持续更新的任务工作台卡片,每轮进展默认折叠。"""
+ steps = list(steps or [])
+ visible_steps = steps[-max_steps:] if max_steps > 0 else steps
+ hidden = max(0, len(steps) - len(visible_steps))
+ status_lines = ["### 状态", f"- 当前:**{status or '工作中'}**"]
+ if elapsed:
+ status_lines.append(f"- 耗时:{elapsed}s")
+ if turn_count:
+ status_lines.append(f"- 轮次:{turn_count}")
+ if hidden:
+ status_lines.append(f"- 早期进展:已折叠,保留最近 {len(visible_steps)} 轮")
+
+ elements: list[dict] = [markdown("\n".join(status_lines))]
+ workflow_source = steps if _extract_task_titles(final_text) else visible_steps
+ workflow = _workflow_markdown([(idx, summary) for idx, summary, _detail in workflow_source], final_text)
+ if workflow:
+ elements.append(markdown(workflow))
+ if final_text:
+ elements.append(markdown("### 最终输出"))
+ elements.extend(markdown(chunk) for chunk in split_markdown_blocks(final_text))
+ if visible_steps:
+ elements.append(hr())
+ elements.append(markdown("### 过程记录"))
+ for idx, summary, detail in visible_steps:
+ elements.append(collapsible_panel(f"第 {idx} 轮 · {summary}", detail))
+ template = "green" if "完成" in str(status) else ("red" if "失败" in str(status) or "错误" in str(status) else "blue")
+ return card_raw(elements, title=title, template=template)
+
+
+def build_final_card(
+ text: str,
+ *,
+ title: str = "已完成",
+ template: str = "green",
+ step_summaries: Iterable[tuple[int, str]] | None = None,
+) -> str:
+ elements: list[dict] = []
+ workflow = _workflow_markdown(step_summaries, text)
+ if workflow:
+ elements.append(markdown(workflow))
+ elements.append(hr())
+ if step_summaries and not _has_output_heading(text):
+ elements.append(markdown("### 最终输出"))
+ elements.extend(markdown(chunk) for chunk in split_markdown_blocks(text))
+ return card_raw(elements, title=title, template=template)
diff --git a/frontends/feishu_post.py b/frontends/feishu_post.py
new file mode 100644
index 000000000..ab334cb64
--- /dev/null
+++ b/frontends/feishu_post.py
@@ -0,0 +1,162 @@
+"""飞书富文本消息工具。"""
+
+from __future__ import annotations
+
+import json
+import os
+import re
+from typing import Any
+
+
+AUTO_POST_MIN_CHARS = int(os.environ.get("GA_FEISHU_POST_MIN_CHARS", "420") or "420")
+AUTO_POST_MIN_LINES = int(os.environ.get("GA_FEISHU_POST_MIN_LINES", "7") or "7")
+MAX_POST_ROWS = int(os.environ.get("GA_FEISHU_POST_MAX_ROWS", "120") or "120")
+MAX_ROW_CHARS = int(os.environ.get("GA_FEISHU_POST_MAX_ROW_CHARS", "1200") or "1200")
+
+_BOLD_RE = re.compile(r"\*\*([^*\n][^*\n]*?)\*\*")
+_HEADING_RE = re.compile(r"^\s{0,3}#{1,6}\s+(.+?)\s*$")
+_STRUCTURED_RE = re.compile(
+ r"(^|\n)\s*(#{1,6}\s+\S|[-*]\s+\S|\d+[.)]\s+\S|"
+ r"正式准入[::]|跳过[::]|证据记录[::]|反馈记录[::]|说明[::]|"
+ r"Dream\s*认知精炼报告|```|\|.+\|)",
+ re.IGNORECASE,
+)
+_OPERATIONAL_CARD_ENABLED = os.environ.get("GA_FEISHU_OPERATIONAL_CARD", "1").lower() not in {
+ "0",
+ "false",
+ "no",
+ "off",
+}
+_OPERATIONAL_RE = re.compile(
+ r"(PID|进程|Gateway|gateway|重启|已启动|已重启|连接已恢复|飞书连接|"
+ r"Feishu|Weixin|微信|平台|connected|运行正常|系统运行|状态稳定|"
+ r"验证|测试|pytest|passed|score\s*\d+|findings|push\s*成功|"
+ r"已同步|origin/main|HEAD|commit|提交|工作区干净|报错|失败)",
+ re.IGNORECASE,
+)
+
+
+def _strip_markdown(text: str) -> str:
+ text = re.sub(r"^\s{0,3}#{1,6}\s+", "", str(text or "")).strip()
+ text = text.replace("**", "").replace("`", "")
+ return re.sub(r"\s+", " ", text).strip()
+
+
+def derive_post_title(text: str, fallback: str = "GA 回复") -> str:
+ """从第一行有意义的内容里取一个稳定标题。"""
+ for line in str(text or "").splitlines():
+ title = _strip_markdown(line)
+ if not title:
+ continue
+ if len(title) > 80:
+ return fallback
+ return title
+ return fallback
+
+
+def should_send_post(text: str, *, force: bool = False) -> bool:
+ """判断回复是否更适合用富文本消息发送。"""
+ text = str(text or "").strip()
+ if not text:
+ return False
+ if force:
+ return True
+ lines = [line for line in text.splitlines() if line.strip()]
+ if len(lines) >= AUTO_POST_MIN_LINES:
+ return True
+ if _STRUCTURED_RE.search(text):
+ return True
+ return len(text) >= AUTO_POST_MIN_CHARS and len(lines) >= 3
+
+
+def should_send_operational_card(text: str) -> bool:
+ """判断短状态/短报告是否适合用卡片呈现。"""
+ if not _OPERATIONAL_CARD_ENABLED:
+ return False
+ text = str(text or "").strip()
+ if not text:
+ return False
+ return bool(_OPERATIONAL_RE.search(text))
+
+
+def derive_operational_card_title(text: str, fallback: str = "状态汇报") -> str:
+ text = str(text or "")
+ if re.search(r"重启|已启动|连接已恢复", text, re.IGNORECASE):
+ return "重启汇报"
+ if re.search(r"验证|测试|pytest|passed|score\s*\d+|findings", text, re.IGNORECASE):
+ return "验证结果"
+ if re.search(r"push|已同步|origin/main|HEAD|commit|提交|工作区干净", text, re.IGNORECASE):
+ return "同步汇报"
+ if re.search(r"报错|失败", text, re.IGNORECASE):
+ return "异常汇报"
+ return fallback
+
+
+def _text_node(text: str, *, bold: bool = False) -> dict[str, Any]:
+ node: dict[str, Any] = {"tag": "text", "text": text}
+ if bold:
+ node["style"] = ["bold"]
+ return node
+
+
+def _inline_nodes(line: str, *, bold_line: bool = False) -> list[dict[str, Any]]:
+ nodes: list[dict[str, Any]] = []
+ pos = 0
+ for match in _BOLD_RE.finditer(line):
+ if match.start() > pos:
+ nodes.append(_text_node(line[pos:match.start()], bold=bold_line))
+ nodes.append(_text_node(match.group(1), bold=True))
+ pos = match.end()
+ if pos < len(line):
+ nodes.append(_text_node(line[pos:], bold=bold_line))
+ return nodes or [_text_node(line or " ")]
+
+
+def _line_nodes(line: str, *, in_code: bool = False) -> list[dict[str, Any]]:
+ line = str(line or "")
+ if len(line) > MAX_ROW_CHARS:
+ line = line[:MAX_ROW_CHARS].rstrip() + "..."
+ if in_code:
+ return [_text_node(line or " ")]
+ heading = _HEADING_RE.match(line)
+ if heading:
+ return _inline_nodes(_strip_markdown(heading.group(1)), bold_line=True)
+ return _inline_nodes(line)
+
+
+def _content_rows(text: str) -> list[list[dict[str, Any]]]:
+ rows: list[list[dict[str, Any]]] = []
+ in_code = False
+ blank_pending = False
+ for raw_line in str(text or "").splitlines():
+ line = raw_line.rstrip()
+ if line.strip().startswith("```"):
+ in_code = not in_code
+ continue
+ if not line.strip():
+ blank_pending = bool(rows)
+ continue
+ if blank_pending and len(rows) < MAX_POST_ROWS - 1:
+ rows.append([_text_node(" ")])
+ blank_pending = False
+ rows.append(_line_nodes(line, in_code=in_code))
+ if len(rows) >= MAX_POST_ROWS:
+ rows.append([_text_node("...(内容较长,已截断)")])
+ break
+ return rows or [[_text_node("(无内容)")]]
+
+
+def build_post_payload(text: str, *, title: str | None = None) -> str:
+ """把普通文本或轻量 Markdown 转成飞书 post 消息。"""
+ text = str(text or "").strip()
+ post_title = title or derive_post_title(text)
+ lines = text.splitlines()
+ if lines and _strip_markdown(lines[0]) == post_title:
+ text = "\n".join(lines[1:]).strip()
+ payload = {
+ "zh_cn": {
+ "title": post_title,
+ "content": _content_rows(text),
+ }
+ }
+ return json.dumps(payload, ensure_ascii=False)
diff --git a/frontends/feishu_task_stream.py b/frontends/feishu_task_stream.py
new file mode 100644
index 000000000..0a9c51b29
--- /dev/null
+++ b/frontends/feishu_task_stream.py
@@ -0,0 +1,635 @@
+import json
+import os
+import re
+import threading
+import time
+
+from frontends.feishu_cards import build_progress_card, build_status_card, build_task_workspace_card
+from frontends.feishu_post import (
+ build_post_payload,
+ derive_operational_card_title,
+ derive_post_title,
+ should_send_operational_card,
+ should_send_post,
+)
+
+
+WORKSPACE_CARD_ENABLED = os.environ.get("GA_FEISHU_TASK_WORKSPACE_CARD", "1").lower() not in {
+ "0",
+ "false",
+ "no",
+ "off",
+}
+WORKSPACE_MAX_STEPS = int(os.environ.get("GA_FEISHU_WORKSPACE_MAX_STEPS", "8") or "8")
+WORKSPACE_FINAL_CHARS = int(os.environ.get("GA_FEISHU_WORKSPACE_FINAL_CHARS", "8500") or "8500")
+WORKSPACE_PROGRESS_AFTER_TURNS = int(os.environ.get("GA_FEISHU_WORKSPACE_PROGRESS_AFTER_TURNS", "2") or "2")
+WORKSPACE_PROGRESS_AFTER_SEC = float(os.environ.get("GA_FEISHU_WORKSPACE_PROGRESS_AFTER_SEC", "8") or "8")
+
+_CASUAL_FINAL_MAX_CHARS = int(os.environ.get("GA_FEISHU_CASUAL_FINAL_MAX_CHARS", "180") or "180")
+_STRUCTURED_FINAL_RE = re.compile(
+ r"(^|\n)\s*(#{1,6}\s+|[-*]\s+|\d+[.、]\s+|```|>\s+)|"
+ r"(任务\s*\d+\s*[::]|Tool Calls|Outputs?|结论|报告|汇报|状态|PID|文件位置|使用方法)",
+ re.IGNORECASE,
+)
+
+
+def split_message(text, limit=7000):
+ text = str(text or "")
+ if len(text) <= limit:
+ return [text]
+ chunks = []
+ remaining = text
+ while len(remaining) > limit:
+ split_at = remaining.rfind("\n", 0, limit)
+ if split_at < limit // 2:
+ split_at = remaining.rfind(" ", 0, limit)
+ if split_at < limit // 2:
+ split_at = limit
+ chunks.append(remaining[:split_at].rstrip())
+ remaining = remaining[split_at:].lstrip()
+ if remaining:
+ chunks.append(remaining)
+ return chunks
+
+
+def natural_group_final(text):
+ """去掉群聊结论里偏机器感的标题壳。"""
+ text = str(text or "").strip()
+ text = re.sub(r"^\s*(?:\*\*)?(?:✅\s*)?(?:结论|最终结论|完成|已完成)(?:\*\*)?\s*[::]?\s*", "", text, flags=re.IGNORECASE)
+ text = re.sub(r"^\s*[-=]{3,}\s*", "", text)
+ return text.strip() or "_(无文本输出)_"
+
+
+def should_send_plain_final(text, *, turn_count=0, group_compact=False):
+ """判断最终回复是否应该保持普通文本气泡。"""
+ raw = str(text or "").strip()
+ visible = natural_group_final(raw)
+ if not visible or len(visible) > _CASUAL_FINAL_MAX_CHARS:
+ return False
+ if turn_count:
+ return False
+ if should_send_operational_card(raw) or should_send_post(raw):
+ return False
+ if _STRUCTURED_FINAL_RE.search(raw):
+ return False
+ if group_compact:
+ return True
+ # 单聊里的短回复也不要强行卡片化,这样更像正常对话。
+ return "\n" not in visible
+
+
+def fmt_tool_call(tc, max_args=240):
+ name = tc.get("tool_name", "?")
+ args = {k: v for k, v in (tc.get("args") or {}).items() if not str(k).startswith("_")}
+ arg_text = json.dumps(args, ensure_ascii=False, default=str)
+ if len(arg_text) > max_args:
+ arg_text = arg_text[:max_args] + "..."
+ return f"- `{name}`({arg_text})"
+
+
+def _clip_line(text, *, limit=520):
+ text = re.sub(r"\s+", " ", str(text or "")).strip()
+ if len(text) > limit:
+ return text[: limit - 3].rstrip() + "..."
+ return text
+
+
+def _path_tail(path):
+ text = str(path or "").strip()
+ if not text:
+ return ""
+ parts = re.split(r"[/\\]", text)
+ return parts[-1] or text
+
+
+def _human_tool_action(tc):
+ name = str(tc.get("tool_name") or "?")
+ args = tc.get("args") or {}
+ if name in {"file_read", "read_file"}:
+ return f"读取 `{_path_tail(args.get('path') or args.get('file_path')) or '文件'}`"
+ if name in {"file_patch", "apply_patch", "edit_file"}:
+ return f"更新 `{_path_tail(args.get('path') or args.get('file_path')) or '文件'}`"
+ if name in {"file_write", "write_file"}:
+ return f"写入 `{_path_tail(args.get('path') or args.get('file_path')) or '文件'}`"
+ if name in {"code_run", "execute_code", "run_command"}:
+ script = args.get("script") or args.get("code") or args.get("cmd") or ""
+ return f"运行命令/脚本:{_clip_line(script, limit=120)}"
+ if name == "delegate_task":
+ tasks = args.get("tasks") or []
+ return f"并行调研 {len(tasks)} 个子任务" if isinstance(tasks, list) else "启动并行调研"
+ if name in {"learning_pipeline", "learning_asset_update"}:
+ return "沉淀学习资产"
+ if name in {"cognitive_dream", "cognitive_store", "cognitive_retrieval"}:
+ return "读取或更新认知记忆"
+ return f"调用 `{name}`"
+
+
+def _extract_result_text(value):
+ if value is None:
+ return ""
+ if isinstance(value, str):
+ return value
+ if isinstance(value, dict):
+ for key in ("summary", "msg", "message", "output", "stdout", "stderr", "error", "content", "final", "result"):
+ item = value.get(key)
+ if item:
+ if isinstance(item, (dict, list)):
+ return json.dumps(item, ensure_ascii=False, default=str)
+ return str(item)
+ status = value.get("status")
+ if status:
+ return f"状态={status}"
+ return json.dumps(value, ensure_ascii=False, default=str)
+ if isinstance(value, list):
+ if not value:
+ return ""
+ snippets = [_extract_result_text(item) for item in value[:3]]
+ return ";".join(s for s in snippets if s)
+ return str(value)
+
+
+def _human_tool_result(result):
+ text = _extract_result_text(result)
+ text = re.sub(r"[\s\S]*?", "", text, flags=re.IGNORECASE).strip()
+ text = re.sub(r"```[\s\S]{900,}?```", "[长代码块已省略]", text)
+ text = re.sub(r"\{\\?\"[^\\n]{900,}", "[结构化输出已省略]", text)
+ lines = [line.strip() for line in text.splitlines() if line.strip()]
+ if not lines:
+ return ""
+ useful = []
+ for line in lines:
+ if line.startswith("[Info] Final response"):
+ continue
+ if line.startswith("[Action]") or line.startswith("[Status]") or line.startswith("[Warn]"):
+ useful.append(_clip_line(line, limit=360))
+ continue
+ if len(line) < 220 or any(token in line for token in ("已", "完成", "失败", "错误", "保存", "路径", "写入", "读取")):
+ useful.append(_clip_line(line, limit=360))
+ if len(useful) >= 3:
+ break
+ return "\n".join(f"- {item}" for item in useful[:3])
+
+
+def _strip_list_prefix(text):
+ return re.sub(r"^\s*[-*]\s*", "", str(text or "")).strip()
+
+
+def build_visible_self_talk(summary, tool_calls=None, tool_results=None, content=""):
+ """生成给用户看的单轮小结。
+
+ 这不是原始思维链,而是说明本轮做了什么、留下了什么证据、下一步怎么走。
+ """
+ summary = _clip_line(summary, limit=180)
+ actions = [_human_tool_action(tc) for tc in (tool_calls or [])[:2]]
+ result_lines = []
+ for result in tool_results or []:
+ text = _human_tool_result(result)
+ if not text:
+ continue
+ for line in text.splitlines():
+ clean = _strip_list_prefix(line)
+ if clean:
+ result_lines.append(clean)
+ if len(result_lines) >= 2:
+ break
+ if len(result_lines) >= 2:
+ break
+ visible_content = _clip_line(content, limit=220)
+
+ finished = summary or "整理当前信息"
+ result = ""
+ if actions:
+ finished = ";".join(actions)
+
+ if result_lines:
+ result = ";".join(result_lines)
+ elif visible_content and visible_content != "...":
+ result = visible_content
+ else:
+ result = "本轮暂无可见产物,已更新阶段判断。"
+
+ if actions:
+ next_step = "基于这些结果继续推进下一轮。"
+ elif visible_content and visible_content != "...":
+ next_step = "等待用户反馈或按当前输出继续展开。"
+ else:
+ next_step = "继续补齐证据后再给结论。"
+
+ lines = [
+ f"- 本轮完成:{finished}",
+ f"- 当前结果:{result}",
+ f"- 下一步:{next_step}",
+ ]
+ return "### 小结\n" + "\n".join(lines)
+
+
+def humanize_step_summary(summary, tool_calls=None):
+ summary = re.sub(r"\s+", " ", str(summary or "")).strip()
+ match = re.match(r"调用工具\s*([A-Za-z_][\w.-]*)\s*,?\s*args:\s*(.+)$", summary)
+ if match:
+ name = match.group(1)
+ args_text = match.group(2)
+ if tool_calls:
+ return _human_tool_action(tool_calls[0])
+ if name in {"file_read", "read_file"}:
+ path_match = re.search(r"'path': '([^']+)'|\"path\": \"([^\"]+)\"", args_text)
+ return f"读取 `{_path_tail((path_match.group(1) or path_match.group(2)) if path_match else '') or '文件'}`"
+ if name in {"file_patch", "apply_patch", "edit_file"}:
+ path_match = re.search(r"'path': '([^']+)'|\"path\": \"([^\"]+)\"", args_text)
+ return f"更新 `{_path_tail((path_match.group(1) or path_match.group(2)) if path_match else '') or '文件'}`"
+ if name == "delegate_task":
+ return "并行调研多个子任务"
+ return f"调用 `{name}`"
+ return _clip_line(summary, limit=120)
+
+
+def build_step_detail(resp, tool_calls, display_text, *, tool_results=None, include_raw_thinking=False, detail_limit=6000):
+ """生成一轮可见进展。
+
+ 默认不展示原始思维链。可见的“思考流”只保留操作痕迹:本轮摘要、动作、产物和用户可见输出。
+ """
+ parts = []
+ thinking = (getattr(resp, "thinking", "") or "").strip() if resp else ""
+ content = display_text((getattr(resp, "content", "") or "")).strip() if resp else ""
+ turn_title = humanize_step_summary("", tool_calls) if tool_calls else ""
+ turn_summary = build_visible_self_talk(
+ turn_title,
+ tool_calls,
+ tool_results=tool_results,
+ content=content,
+ )
+ if include_raw_thinking and thinking:
+ parts.append(f"### 原始思考\n{thinking}")
+ if turn_title:
+ parts.append(f"### 本轮完成\n{turn_title}")
+ if tool_calls:
+ raw_trace = os.environ.get("GA_FEISHU_RAW_TOOL_TRACE", "").lower() in {"1", "true", "yes"}
+ if raw_trace:
+ parts.append("### 执行动作\n" + "\n".join(fmt_tool_call(tc) for tc in tool_calls))
+ else:
+ lines = []
+ for tc in tool_calls:
+ name = str(tc.get("tool_name") or "?")
+ lines.append(f"- `{name}`: {_human_tool_action(tc)}")
+ parts.append("### 执行动作\n" + "\n".join(lines))
+ result_lines = []
+ for result in tool_results or []:
+ line = _human_tool_result(result)
+ if line:
+ result_lines.append(line)
+ if len(result_lines) >= 3:
+ break
+ if result_lines:
+ parts.append("### 产物/证据\n" + "\n".join(result_lines))
+ if content and content != "...":
+ parts.append(f"### 可见输出\n{content}")
+ # 小结是本轮收口,放在动作、产物和可见输出之后更自然。
+ parts.append(turn_summary)
+ detail = "\n\n".join(parts).strip() or "_(无可见输出)_"
+ if len(detail) > detail_limit:
+ detail = detail[:detail_limit] + f"\n\n...(已截断, 共 {len(detail)} 字符)"
+ return detail
+
+
+class FeishuTaskStream:
+ """飞书长任务流。
+
+ 长任务应该像一个持续更新的工作台,而不是散落一地的进度消息。
+ 默认只更新同一张卡片,每轮过程折叠;短群聊回复仍然走自然文本。
+ """
+
+ def __init__(
+ self,
+ receive_id,
+ rid_type,
+ *,
+ send_raw,
+ patch_card,
+ display_text,
+ detail_limit=6000,
+ final_chunk_limit=7000,
+ status_tail=5,
+ quiet=False,
+ group_compact=False,
+ group_progress_after_sec=12,
+ send_initial_status=True,
+ reply_to=None,
+ send_reply=None,
+ workspace_card=WORKSPACE_CARD_ENABLED,
+ workspace_max_steps=WORKSPACE_MAX_STEPS,
+ workspace_final_chars=WORKSPACE_FINAL_CHARS,
+ workspace_progress_after_turns=WORKSPACE_PROGRESS_AFTER_TURNS,
+ workspace_progress_after_sec=None,
+ ):
+ self.rid = receive_id
+ self.rtype = rid_type
+ self._send_raw = send_raw
+ self._patch_card = patch_card
+ self._display_text = display_text
+ self.detail_limit = detail_limit
+ self.final_chunk_limit = final_chunk_limit
+ self.status_tail = status_tail
+ self.status = "🤔 思考中..."
+ self.msg_id = None
+ self.started_at = time.time()
+ self.turn_count = 0
+ self.step_summaries = []
+ self.final_message_ids = []
+ self.quiet = quiet
+ self.group_compact = group_compact
+ self.group_progress_after_sec = group_progress_after_sec
+ self.send_initial_status = send_initial_status
+ self.reply_to = reply_to
+ self._send_reply = send_reply
+ self.workspace_card = bool(workspace_card and not quiet)
+ self.workspace_max_steps = workspace_max_steps
+ self.workspace_final_chars = workspace_final_chars
+ self.workspace_progress_after_turns = max(1, int(workspace_progress_after_turns or 1))
+ if workspace_progress_after_sec is None:
+ workspace_progress_after_sec = group_progress_after_sec if group_compact else WORKSPACE_PROGRESS_AFTER_SEC
+ self.workspace_progress_after_sec = max(0.0, float(workspace_progress_after_sec or 0.0))
+ self.steps = []
+ self._lock = threading.RLock()
+ self._terminal = False
+
+ def _send_message(self, payload, msg_type="interactive"):
+ if self.reply_to and self._send_reply:
+ return self._send_reply(self.reply_to, payload, msg_type)
+ return self._send_raw(self.rid, payload, msg_type, self.rtype)
+
+ def _send_text_fallback(self, text):
+ msg_id = self._send_message(
+ json.dumps({"text": str(text or "_(无文本输出)_")}, ensure_ascii=False),
+ "text",
+ )
+ if msg_id:
+ self.final_message_ids.append(msg_id)
+ return msg_id
+
+ def _send_plain_chunks(self, text, *, limit=3500):
+ sent = 0
+ for chunk in split_message(str(text or "_(无文本输出)_"), min(self.final_chunk_limit, limit)):
+ msg_id = self._send_message(
+ json.dumps({"text": chunk}, ensure_ascii=False),
+ "text",
+ )
+ if msg_id:
+ self.final_message_ids.append(msg_id)
+ sent += 1
+ return sent
+
+ def _send_post_chunks(self, text, *, title="结论"):
+ sent = 0
+ chunks = split_message(str(text or "_(无文本输出)_"), self.final_chunk_limit)
+ total = len(chunks)
+ for idx, chunk in enumerate(chunks, 1):
+ chunk_title = title if total == 1 else f"{title} ({idx}/{total})"
+ msg_id = self._send_message(build_post_payload(chunk, title=chunk_title), "post")
+ if msg_id:
+ self.final_message_ids.append(msg_id)
+ sent += 1
+ return sent
+
+ def _final_should_post(self, text):
+ return should_send_operational_card(text) or should_send_post(text)
+
+ def _final_post_title(self, text, *, fallback="结论"):
+ if should_send_operational_card(text):
+ return derive_operational_card_title(text)
+ return derive_post_title(text, fallback=fallback)
+
+ def _status_card(self):
+ elapsed = int(time.time() - self.started_at)
+ return build_status_card(
+ self.status,
+ elapsed=elapsed,
+ turn_count=self.turn_count,
+ step_summaries=self.step_summaries[-self.status_tail:],
+ )
+
+ def _push_status(self):
+ payload = self._status_card()
+ if self.msg_id:
+ ok = self._patch_card(self.msg_id, payload)
+ if ok:
+ return
+ self.msg_id = self._send_raw(self.rid, payload, "interactive", self.rtype)
+
+ def _workspace_card(self, final_text=""):
+ elapsed = int(time.time() - self.started_at)
+ return build_task_workspace_card(
+ status=self.status,
+ steps=self.steps,
+ final_text=final_text,
+ elapsed=elapsed,
+ turn_count=self.turn_count,
+ max_steps=self.workspace_max_steps,
+ )
+
+ def _push_workspace(self, final_text=""):
+ payload = self._workspace_card(final_text=final_text)
+ if self.msg_id:
+ ok = self._patch_card(self.msg_id, payload)
+ if ok:
+ return self.msg_id
+ self.msg_id = self._send_message(payload, "interactive")
+ return self.msg_id
+
+ def _should_show_workspace(self):
+ if self.msg_id:
+ return True
+ if self.turn_count >= self.workspace_progress_after_turns:
+ return True
+ return (time.time() - self.started_at) >= self.workspace_progress_after_sec
+
+ def start(self):
+ with self._lock:
+ if self._terminal or self.quiet or self.group_compact or not self.send_initial_status:
+ return
+ if self.workspace_card:
+ self._push_workspace()
+ else:
+ self._push_status()
+
+ def pulse(self, status):
+ with self._lock:
+ if self._terminal or self.quiet:
+ return
+ self.status = status
+ if self.workspace_card:
+ self._push_workspace()
+ elif not self.group_compact:
+ self._push_status()
+
+ def step(self, summary, detail=""):
+ with self._lock:
+ if self._terminal:
+ return
+ self.turn_count += 1
+ summary = re.sub(r"\s+", " ", str(summary or f"第 {self.turn_count} 轮")).strip()
+ if len(summary) > 120:
+ summary = summary[:117] + "..."
+ self.step_summaries.append((self.turn_count, summary))
+ self.status = f"⏳ 工作中 · 第 {self.turn_count} 轮"
+ self.steps.append((self.turn_count, summary, str(detail or "_(无输出)_")))
+
+ if self.quiet:
+ # 群聊里先安静工作,最后给结论,避免刷屏。
+ return
+ if self.workspace_card:
+ if self._should_show_workspace():
+ self._push_workspace()
+ return
+ if self.group_compact:
+ elapsed = time.time() - self.started_at
+ if elapsed < self.group_progress_after_sec and self.turn_count <= 1:
+ return
+ detail = str(detail or "")
+ tool_lines = []
+ for line in detail.splitlines():
+ if line.strip().startswith("- `"):
+ tool_lines.append(line.strip())
+ if len(tool_lines) >= 3:
+ break
+ lines = [f"**进展 · 第 {self.turn_count} 轮**", summary]
+ if tool_lines:
+ lines.append("")
+ lines.extend(tool_lines)
+ content = "\n".join(lines)
+ for chunk in split_message(content, min(self.final_chunk_limit, 3500)):
+ self._send_message(
+ build_progress_card(self.turn_count, summary, chunk, compact=True),
+ "interactive",
+ )
+ return
+
+ # 单聊完整模式
+ detail = str(detail or "_(无输出)_")
+ if len(detail) > self.detail_limit:
+ detail = detail[:self.detail_limit] + f"\n\n...(已截断, 共 {len(detail)} 字符)"
+ content = f"**摘要**: {summary}\n\n{detail}"
+ for chunk in split_message(content, self.final_chunk_limit):
+ self._send_message(
+ build_progress_card(self.turn_count, summary, chunk),
+ "interactive",
+ )
+ self._push_status()
+
+ def done(self, text):
+ with self._lock:
+ self._terminal = True
+ visible = self._display_text(text)
+ if visible == "...":
+ visible = "_(无文本输出)_"
+ print(
+ f"[feishu-task-stream] final: chars={len(visible)} turns={self.turn_count} "
+ f"group_compact={self.group_compact} workspace={self.workspace_card}",
+ flush=True,
+ )
+ if should_send_plain_final(visible, turn_count=self.turn_count, group_compact=self.group_compact):
+ msg_id = self._send_message(
+ json.dumps({"text": natural_group_final(visible)}, ensure_ascii=False),
+ "text",
+ )
+ if msg_id:
+ self.final_message_ids.append(msg_id)
+ else:
+ self._send_text_fallback(natural_group_final(visible))
+ self.status = "✅ 已完成"
+ return
+ if self.workspace_card and self.msg_id:
+ visible = natural_group_final(visible) if self.group_compact else visible
+ if len(visible) <= self.workspace_final_chars:
+ self.status = "✅ 已完成"
+ final_should_post = self._final_should_post(visible)
+ final_text = "结论已通过富文本消息发送。" if final_should_post else visible
+ if not self._push_workspace(final_text=final_text) and not self.group_compact:
+ self._send_text_fallback(visible)
+ if final_should_post:
+ if not self._send_post_chunks(visible, title=self._final_post_title(visible)):
+ self._send_text_fallback(visible)
+ elif self.group_compact:
+ self._send_plain_chunks(visible)
+ if self.group_compact:
+ self.status = "✅ 已完成"
+ return
+ self.status = "✅ 已完成 · 结论另发"
+ if not self._push_workspace(final_text="结论内容较长,已拆成后续消息发送。"):
+ print("[feishu-task-stream] workspace card send failed before final chunks", flush=True)
+ if self.group_compact:
+ if not self._send_post_chunks(visible, title=self._final_post_title(visible)):
+ self._send_text_fallback(visible)
+ return
+ if not self._send_post_chunks(visible, title=self._final_post_title(visible)):
+ self._send_text_fallback(visible)
+ return
+ if self.group_compact:
+ visible = natural_group_final(visible)
+ operational_card = should_send_operational_card(visible)
+ use_post = operational_card or should_send_post(visible)
+ chunks = split_message(visible, self.final_chunk_limit if use_post else min(self.final_chunk_limit, 3500))
+ card_title = (
+ derive_operational_card_title(visible)
+ if operational_card
+ else derive_post_title(visible, fallback="已完成")
+ )
+ total = len(chunks)
+ for idx, chunk in enumerate(chunks, 1):
+ if use_post:
+ title = card_title if total == 1 else f"{card_title} ({idx}/{total})"
+ msg_id = self._send_message(
+ build_post_payload(chunk, title=title),
+ "post",
+ )
+ if msg_id:
+ self.final_message_ids.append(msg_id)
+ continue
+ msg_id = self._send_message(
+ json.dumps({"text": chunk}, ensure_ascii=False),
+ "text",
+ )
+ if msg_id:
+ self.final_message_ids.append(msg_id)
+ if not self.final_message_ids:
+ self._send_text_fallback(visible)
+ self.status = "✅ 已完成"
+ return
+ title = "结论" if self.quiet else self._final_post_title(visible, fallback="已完成")
+ self._send_post_chunks(visible, title=title)
+ if not self.final_message_ids:
+ self._send_text_fallback(visible)
+ self.status = "✅ 已完成 · 结论已发送"
+ if not self.quiet:
+ self._push_status()
+
+ def fail(self, msg):
+ with self._lock:
+ self._terminal = True
+ if self.workspace_card and self.msg_id:
+ self.status = f"❌ {msg}"
+ self._push_workspace()
+ return
+ if self.quiet or self.group_compact:
+ self._send_message(
+ json.dumps({"text": f"出错了:{msg}"}, ensure_ascii=False),
+ "text",
+ )
+ return
+ self.status = f"❌ {msg}"
+ self._push_status()
+
+ def cancel(self, msg="已停止当前任务"):
+ with self._lock:
+ self._terminal = True
+ if self.workspace_card and self.msg_id:
+ self.status = f"⏹️ {msg}"
+ self._push_workspace()
+ return
+ if self.quiet or self.group_compact:
+ self._send_message(
+ json.dumps({"text": str(msg or "已停止当前任务")}, ensure_ascii=False),
+ "text",
+ )
+ return
+ self.status = f"⏹️ {msg}"
+ self._push_status()
diff --git a/frontends/fsapp.py b/frontends/fsapp.py
index ad959f660..ef3d3d869 100644
--- a/frontends/fsapp.py
+++ b/frontends/fsapp.py
@@ -80,6 +80,11 @@ def _ensure_runtime_paths():
_ensure_runtime_paths()
from agentmain import GeneraticAgent
from frontends.chatapp_common import AgentChatMixin, FILE_HINT, split_text
+from frontends.feishu_task_stream import (
+ FeishuTaskStream,
+ build_step_detail as build_stream_step_detail,
+ humanize_step_summary,
+)
_TAG_PATS = [r"<" + t + r">.*?" + t + r">" for t in ("thinking", "summary", "tool_use", "file_content")]
_IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".ico", ".tiff", ".tif"}
@@ -619,91 +624,12 @@ def _fmt_tool_call(tc):
def _build_step_detail(resp, tool_calls):
- """从 LLM response + tool_calls 组装单步展开详情(纯函数)。"""
- parts = []
- thinking = (getattr(resp, 'thinking', '') or '').strip() if resp else ''
- if thinking:
- parts.append(f"### 💭 Thinking\n{thinking}")
- if tool_calls:
- parts.append("### 🛠 Tool Calls\n" + "\n".join(_fmt_tool_call(tc) for tc in tool_calls))
- content = _display_text((getattr(resp, 'content', '') or '')).strip() if resp else ''
- if content and content != '...':
- parts.append(f"### 📝 Output\n{content}")
- return "\n\n".join(parts)
-
-
-class _TaskCard:
- """飞书任务卡片:单卡片持续 patch;每步一个独立折叠面板(header 显示 summary,展开看详情)。"""
- _DETAIL_LIMIT = 8000
-
- def __init__(self, receive_id, rid_type):
- self.rid, self.rtype = receive_id, rid_type
- self.steps = [] # [(summary, detail), ...]
- self.status = "🤔 思考中..."
- self.final = None
- self.msg_id = None
- self.start_fallback_sent = False
- self.final_fallback_sent = False
-
- def _step_panel(self, idx, summary, detail):
- detail = detail or "_(无输出)_"
- if len(detail) > self._DETAIL_LIMIT:
- detail = detail[:self._DETAIL_LIMIT] + f"\n\n…(已截断,共 {len(detail)} 字符)"
- return {
- "tag": "collapsible_panel", "expanded": False,
- "header": {"title": {"tag": "plain_text", "content": f"Turn {idx} · {summary}"}},
- "elements": [{"tag": "markdown", "content": detail}],
- }
-
- def _build(self):
- els = [{"tag": "markdown", "content": f"**{self.status}**"}]
- for i, (s, d) in enumerate(self.steps, 1):
- els.append(self._step_panel(i, s, d))
- if self.final:
- els += [{"tag": "hr"}, {"tag": "markdown", "content": self.final}]
- return _card_raw(els)
-
- def _push(self):
- card = self._build()
- if self.msg_id:
- ok = _patch_card(self.msg_id, card)
- else:
- self.msg_id = _send_raw(self.rid, card, "interactive", self.rtype)
- ok = bool(self.msg_id)
- return ok
-
- def _fallback_text(self, text, *, final=False):
- attr = "final_fallback_sent" if final else "start_fallback_sent"
- if getattr(self, attr):
- return
- setattr(self, attr, True)
- send_message(self.rid, text, receive_id_type=self.rtype)
-
- # ── 公开接口 ──
-
- def start(self):
- if not self._push():
- self._fallback_text("🤔 思考中...")
-
- def step(self, summary, detail=""):
- self.steps.append((summary, detail))
- self.status = f"⏳ 工作中 · Turn {len(self.steps)}"
- self._push()
-
- def done(self, text):
- self.status = "✅ 已完成"
- self.final = text or "_(无文本输出)_"
- if not self._push():
- self._fallback_text(_display_text(text), final=True)
-
- def fail(self, msg):
- self.status = f"❌ {msg}"
- if not self._push():
- self._fallback_text(f"❌ {msg}", final=True)
+ """从 LLM response + tool_calls 组装用户可读的单步详情。"""
+ return build_stream_step_detail(resp, tool_calls or [], _display_text)
-def _make_task_hook(card, task_id, on_final):
- """飞书任务 hook:每轮 patch 卡片状态;结束触发 on_final(raw) 处理附件。"""
+def _make_task_hook(task_stream, task_id, on_final):
+ """飞书任务 hook:每轮更新同一个任务工作台;结束后处理附件。"""
def hook(ctx):
try:
parent = getattr(ctx.get("self"), "parent", None)
@@ -714,8 +640,15 @@ def hook(ctx):
raw = resp.content if hasattr(resp, 'content') else str(resp)
on_final(raw)
elif ctx.get('summary'):
- detail = _build_step_detail(ctx.get('response'), ctx.get('tool_calls') or [])
- card.step(ctx['summary'], detail)
+ tool_calls = ctx.get('tool_calls') or []
+ detail = build_stream_step_detail(
+ ctx.get('response'),
+ tool_calls,
+ _display_text,
+ tool_results=ctx.get('tool_results') or [],
+ )
+ summary = humanize_step_summary(ctx.get('summary') or "", tool_calls)
+ task_stream.step(summary, detail)
except Exception as e:
print(f"[fs hook] error: {e}")
return hook
@@ -744,7 +677,15 @@ async def run_agent(self, chat_id, text, *, receive_id=None, receive_id_type="op
rid = receive_id or chat_id
task_id = f"{chat_id}_{uuid.uuid4().hex}"
hook_key = f"fs_{task_id}"
- card = _TaskCard(rid, receive_id_type)
+ task_stream = FeishuTaskStream(
+ rid,
+ receive_id_type,
+ send_raw=_send_raw,
+ patch_card=_patch_card,
+ display_text=_display_text,
+ group_compact=(receive_id_type == "chat_id"),
+ send_initial_status=False,
+ )
result = {"raw": None, "sent": False}
finish_lock = threading.Lock()
@@ -754,17 +695,18 @@ def _finish(raw):
return
result["raw"] = raw
result["sent"] = True
- card.done(_display_text(raw))
+ task_stream.done(raw)
_send_generated_files(rid, raw, receive_id_type=receive_id_type)
try:
- await asyncio.to_thread(card.start)
+ await asyncio.to_thread(task_stream.start)
if not hasattr(self.agent, '_turn_end_hooks'):
self.agent._turn_end_hooks = {}
- self.agent._turn_end_hooks[hook_key] = _make_task_hook(card, task_id, _finish)
+ self.agent._turn_end_hooks[hook_key] = _make_task_hook(task_stream, task_id, _finish)
self.agent._fs_active_task_id = task_id
dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source, images=images or None)
start = time.time()
+ last_pulse = 0
while state["running"] and not result["sent"]:
try:
item = await asyncio.to_thread(dq.get, True, 1)
@@ -773,16 +715,21 @@ def _finish(raw):
if item and "done" in item:
await asyncio.to_thread(_finish, item.get("done", ""))
break
+ if item and item.get("next"):
+ elapsed = int(time.time() - start)
+ if elapsed - last_pulse >= 15:
+ await asyncio.to_thread(task_stream.pulse, f"⏳ 工作中 · {elapsed}s")
+ last_pulse = elapsed
if time.time() - start > AGENT_TIMEOUT_SEC:
self.agent.abort()
- await asyncio.to_thread(card.fail, "任务超时")
+ await asyncio.to_thread(task_stream.fail, "任务超时,已停止当前任务")
break
if not state["running"] and not result["sent"]:
self.agent.abort()
- await asyncio.to_thread(card.fail, "已停止")
+ await asyncio.to_thread(task_stream.cancel, "已停止当前任务")
except Exception as e:
traceback.print_exc()
- await asyncio.to_thread(card.fail, f"错误: {e}")
+ await asyncio.to_thread(task_stream.fail, f"错误: {e}")
finally:
if getattr(self.agent, "_fs_active_task_id", None) == task_id:
try:
diff --git a/tests/test_feishu_post.py b/tests/test_feishu_post.py
new file mode 100644
index 000000000..f7e3f7bb9
--- /dev/null
+++ b/tests/test_feishu_post.py
@@ -0,0 +1,39 @@
+import json
+
+from frontends.feishu_post import build_post_payload, derive_post_title, should_send_post
+
+
+def test_short_casual_reply_stays_text():
+ assert not should_send_post("可以")
+ assert not should_send_post("收到,我来处理。")
+
+
+def test_structured_report_uses_post():
+ report = "\n".join([
+ "Dream 认知精炼报告 | 2026-05-15",
+ "",
+ "旁路复盘: 1 条",
+ "1. [L2] 飞书对话: 当天群聊应进入强记忆。",
+ "",
+ "证据记录: 8 条",
+ "反馈记录: 2 条",
+ ])
+
+ assert should_send_post(report)
+
+
+def test_build_post_payload_preserves_title_and_structure():
+ payload = json.loads(build_post_payload("**结论**\n\n- 已接入 Post\n- 短句仍发 text"))
+
+ assert payload["zh_cn"]["title"] == "结论"
+ rows = payload["zh_cn"]["content"]
+ flat = json.dumps(rows, ensure_ascii=False)
+ assert "已接入 Post" in flat
+ assert "短句仍发 text" in flat
+ assert rows[0][0]["text"].startswith("- 已接入")
+
+
+def test_derive_post_title_falls_back_for_long_first_line():
+ long_line = "这是一段很长很长的普通回复" * 8
+
+ assert derive_post_title(long_line, fallback="GA 结论") == "GA 结论"
diff --git a/tests/test_feishu_task_stream.py b/tests/test_feishu_task_stream.py
new file mode 100644
index 000000000..ced54eaa3
--- /dev/null
+++ b/tests/test_feishu_task_stream.py
@@ -0,0 +1,570 @@
+import json
+
+from frontends.feishu_cards import build_final_card
+from frontends.feishu_task_stream import (
+ FeishuTaskStream,
+ build_step_detail,
+ humanize_step_summary,
+ natural_group_final,
+ should_send_plain_final,
+)
+
+
+class _Resp:
+ thinking = "private chain of thought"
+ content = "visible output"
+
+
+def _display_text(text):
+ return text or "..."
+
+
+def _payload_text(payload):
+ data = json.loads(payload)
+ parts = []
+
+ post = data.get("zh_cn")
+ if isinstance(post, dict):
+ title = post.get("title")
+ if isinstance(title, str):
+ parts.append(title)
+ for row in post.get("content", []) or []:
+ for node in row or []:
+ if isinstance(node, dict) and isinstance(node.get("text"), str):
+ parts.append(node["text"])
+ return "\n".join(part for part in parts if part)
+
+ def walk(element):
+ if not isinstance(element, dict):
+ return
+ header = element.get("header", {})
+ if isinstance(header, dict):
+ title = header.get("title", {})
+ if isinstance(title, dict):
+ parts.append(title.get("content", ""))
+ for key in ("content", "text"):
+ value = element.get(key)
+ if isinstance(value, str):
+ parts.append(value)
+ for child in element.get("elements", []) or []:
+ walk(child)
+
+ header = data.get("header", {})
+ title = header.get("title", {}) if isinstance(header, dict) else {}
+ if isinstance(title, dict):
+ parts.append(title.get("content", ""))
+ for element in data.get("body", {}).get("elements", []):
+ walk(element)
+ return "\n".join(part for part in parts if part)
+
+
+def test_step_detail_hides_raw_thinking_by_default():
+ detail = build_step_detail(
+ _Resp(),
+ [{"tool_name": "file_read", "args": {"path": "/tmp/a.txt"}}],
+ _display_text,
+ )
+
+ assert "private chain of thought" not in detail
+ assert "### 小结" in detail
+ assert "本轮完成:读取 `a.txt`" in detail
+ assert "执行动作" in detail
+ assert "读取 `a.txt`" in detail
+ assert "visible output" in detail
+ assert "{\"path\"" not in detail
+
+
+def test_step_detail_places_summary_after_turn_outputs():
+ detail = build_step_detail(
+ _Resp(),
+ [{"tool_name": "file_read", "args": {"path": "/tmp/a.txt"}}],
+ _display_text,
+ tool_results=[{"status": "success", "output": "已读取 a.txt"}],
+ )
+
+ assert detail.index("### 执行动作") < detail.index("### 产物/证据")
+ assert detail.index("### 产物/证据") < detail.index("### 可见输出")
+ assert detail.index("### 可见输出") < detail.index("### 小结")
+
+
+def test_step_detail_summarizes_tool_results_without_raw_trace():
+ detail = build_step_detail(
+ _Resp(),
+ [{"tool_name": "file_patch", "args": {"path": "/tmp/USER.md", "old_content": "x" * 1200}}],
+ _display_text,
+ tool_results=[{"status": "success", "output": "patched /tmp/USER.md\n当前持仓已写入本地记忆"}],
+ )
+
+ assert "更新 `USER.md`" in detail
+ assert "当前结果" in detail
+ assert "产物/证据" in detail
+ assert "当前持仓已写入本地记忆" in detail
+ assert "old_content" not in detail
+
+
+def test_step_detail_surfaces_error_outputs():
+ detail = build_step_detail(
+ _Resp(),
+ [{"tool_name": "code_run", "args": {"script": "python scanner.py"}}],
+ _display_text,
+ tool_results=[{"status": "error", "error": "KeyError: slice(None, 800, None)"}],
+ )
+
+ assert "产物/证据" in detail
+ assert "KeyError" in detail
+ assert "调用" not in detail
+
+
+def test_humanize_raw_tool_summary():
+ summary = "调用工具file_read, args: {'path': '/Users/me/memory/USER.md'}"
+
+ assert humanize_step_summary(summary) == "读取 `USER.md`"
+
+
+def test_task_stream_patches_one_workspace_card_for_progress_and_final():
+ sent = []
+ patched = []
+
+ def send_raw(receive_id, payload, msg_type, rid_type):
+ sent.append((receive_id, payload, msg_type, rid_type))
+ return f"msg-{len(sent)}"
+
+ def patch_card(message_id, payload):
+ patched.append((message_id, payload))
+ return True
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=send_raw,
+ patch_card=patch_card,
+ display_text=_display_text,
+ )
+
+ stream.start()
+ stream.step("读取配置", "### Tool Calls\n- `file_read`({})")
+ stream.step("运行验证", "### Tool Calls\n- `code_run`({})")
+ stream.done("最终结论")
+
+ assert len(sent) == 1
+ assert "任务工作台" in _payload_text(sent[0][1])
+ assert "第 1 轮 · 读取配置" in _payload_text(patched[0][1])
+ assert "第 2 轮 · 运行验证" in _payload_text(patched[1][1])
+ assert "已完成" in _payload_text(patched[-1][1])
+ assert "最终输出" in _payload_text(patched[-1][1])
+ assert "最终结论" in _payload_text(patched[-1][1])
+ assert patched
+
+
+def test_workspace_card_keeps_only_recent_steps():
+ sent = []
+ patched = []
+
+ def send_raw(receive_id, payload, msg_type, rid_type):
+ sent.append(payload)
+ return "status-msg"
+
+ def patch_card(message_id, payload):
+ patched.append(payload)
+ return True
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=send_raw,
+ patch_card=patch_card,
+ display_text=_display_text,
+ workspace_max_steps=2,
+ )
+ stream.start()
+ for idx in range(5):
+ stream.step(f"步骤 {idx}", "detail")
+
+ latest_status = _payload_text(patched[-1])
+ assert "步骤 0" not in latest_status
+ assert "步骤 1" not in latest_status
+ assert "步骤 3" in latest_status
+ assert "步骤 4" in latest_status
+
+
+def test_workspace_structured_final_is_sent_as_post_not_embedded_in_card():
+ sent = []
+ patched = []
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append((payload, msg_type)) or f"msg-{len(sent)}",
+ patch_card=lambda message_id, payload: patched.append(payload) or True,
+ display_text=_display_text,
+ )
+
+ stream.start()
+ stream.step("修改发送通道", "detail")
+ stream.done("改造已完成,汇报结果:\n\n**做了什么:**\n1. 改为 post\n2. 重启验证")
+
+ assert sent[0][1] == "interactive"
+ assert sent[-1][1] == "post"
+ assert "**做了什么:**" not in _payload_text(patched[-1])
+ assert "结论已通过富文本消息发送" in _payload_text(patched[-1])
+ assert "做了什么:" in _payload_text(sent[-1][0])
+ assert "改为 post" in _payload_text(sent[-1][0])
+
+
+def test_task_stream_can_skip_initial_status_until_second_step():
+ sent = []
+ patched = []
+
+ def send_raw(receive_id, payload, msg_type, rid_type):
+ sent.append((receive_id, payload, msg_type, rid_type))
+ return f"msg-{len(sent)}"
+
+ def patch_card(message_id, payload):
+ patched.append((message_id, payload))
+ return True
+
+ stream = FeishuTaskStream(
+ "open-1",
+ "open_id",
+ send_raw=send_raw,
+ patch_card=patch_card,
+ display_text=_display_text,
+ send_initial_status=False,
+ )
+
+ stream.start()
+ assert sent == []
+ stream.step("读取配置", "detail")
+ assert sent == []
+ stream.step("运行验证", "detail")
+ assert len(sent) == 1
+ assert "第 1 轮 · 读取配置" in _payload_text(sent[0][1])
+ assert "第 2 轮 · 运行验证" in _payload_text(sent[0][1])
+
+
+def test_quiet_task_stream_sends_only_final_conclusion():
+ sent = []
+ patched = []
+
+ def send_raw(receive_id, payload, msg_type, rid_type):
+ sent.append((receive_id, payload, msg_type, rid_type))
+ return f"msg-{len(sent)}"
+
+ def patch_card(message_id, payload):
+ patched.append((message_id, payload))
+ return True
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=send_raw,
+ patch_card=patch_card,
+ display_text=_display_text,
+ quiet=True,
+ )
+
+ stream.start()
+ stream.pulse("working")
+ stream.step("读取配置", "### Tool Calls\n- `file_read`({})")
+ stream.done("最终结论")
+
+ assert len(sent) == 1
+ assert "Turn" not in _payload_text(sent[0][1])
+ assert "读取配置" not in _payload_text(sent[0][1])
+ assert "思考流" not in _payload_text(sent[0][1])
+ assert "Tool Calls" not in _payload_text(sent[0][1])
+ assert "最终结论" in _payload_text(sent[0][1])
+ assert not patched
+
+
+def test_group_compact_replies_with_progress_and_plain_final():
+ sent = []
+ replies = []
+ patched = []
+
+ def send_raw(receive_id, payload, msg_type, rid_type):
+ sent.append((receive_id, payload, msg_type, rid_type))
+ return f"msg-{len(sent)}"
+
+ def send_reply(message_id, payload, msg_type):
+ replies.append((message_id, payload, msg_type))
+ return f"reply-{len(replies)}"
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=send_raw,
+ patch_card=lambda message_id, payload: patched.append((message_id, payload)) or True,
+ display_text=_display_text,
+ group_compact=True,
+ group_progress_after_sec=0,
+ reply_to="om_123",
+ send_reply=send_reply,
+ )
+
+ stream.start()
+ stream.step("查 cron 配置", "### Tool Calls\n- `file_read`({})")
+ stream.done("**结论**\n\n收到,DAG 报告不会再发群里。")
+
+ assert sent == []
+ assert replies[0][0] == "om_123"
+ assert replies[0][2] == "interactive"
+ assert "第 1 轮 · 查 cron 配置" in _payload_text(replies[0][1])
+ assert "收到,DAG 报告不会再发群里。" in _payload_text(patched[-1][1])
+ assert replies[-1][2] == "text"
+ assert json.loads(replies[-1][1])["text"] == "收到,DAG 报告不会再发群里。"
+
+
+def test_terminal_workspace_ignores_late_pulse_after_done():
+ sent = []
+ patched = []
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append(payload) or "msg",
+ patch_card=lambda message_id, payload: patched.append(payload) or True,
+ display_text=_display_text,
+ group_compact=True,
+ workspace_progress_after_sec=0,
+ )
+
+ stream.step("读取配置", "detail")
+ stream.done("最终结论")
+ final_patch_count = len(patched)
+ stream.pulse("⏳ 工作中 · 999s")
+ stream.step("迟到进展", "detail")
+
+ assert len(patched) == final_patch_count
+ assert "✅ 已完成" in _payload_text(patched[-1])
+ assert "999s" not in _payload_text(patched[-1])
+
+
+def test_group_compact_replies_structured_final_as_post():
+ replies = []
+
+ def send_reply(message_id, payload, msg_type):
+ replies.append((message_id, payload, msg_type))
+ return f"reply-{len(replies)}"
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=lambda *args: "raw",
+ patch_card=lambda *_: True,
+ display_text=_display_text,
+ group_compact=True,
+ reply_to="om_123",
+ send_reply=send_reply,
+ )
+
+ stream.done(
+ "\n".join([
+ "**结论**",
+ "",
+ "已完成清理:",
+ "1. 保留短句普通回复",
+ "2. 长报告自动改成卡片",
+ "3. Dream 报告默认走卡片",
+ ])
+ )
+
+ assert replies[0][2] == "post"
+ payload = json.loads(replies[0][1])
+ assert payload["zh_cn"]["title"] == "已完成清理:"
+ flat = json.dumps(payload["zh_cn"]["content"], ensure_ascii=False)
+ assert "长报告自动改成卡片" in flat
+
+
+def test_group_compact_replies_operational_status_as_post():
+ replies = []
+
+ def send_reply(message_id, payload, msg_type):
+ replies.append((message_id, payload, msg_type))
+ return f"reply-{len(replies)}"
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=lambda *args: "raw",
+ patch_card=lambda *_: True,
+ display_text=_display_text,
+ group_compact=True,
+ reply_to="om_123",
+ send_reply=send_reply,
+ )
+
+ stream.done("状态稳定。Feishu 和 Weixin 在线,PID 97981。")
+
+ assert replies[0][2] == "post"
+ payload = json.loads(replies[0][1])
+ assert payload["zh_cn"]["title"] == "状态汇报"
+ assert "PID 97981" in _payload_text(replies[0][1])
+
+
+def test_group_compact_keeps_casual_short_reply_as_text():
+ replies = []
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=lambda *args: "raw",
+ patch_card=lambda *_: True,
+ display_text=_display_text,
+ group_compact=True,
+ reply_to="om_123",
+ send_reply=lambda message_id, payload, msg_type: replies.append((message_id, payload, msg_type)) or "reply",
+ )
+
+ stream.done("我在,直接说。")
+
+ assert replies[0][2] == "text"
+ assert json.loads(replies[0][1])["text"] == "我在,直接说。"
+
+
+def test_private_casual_reply_stays_text_not_card():
+ sent = []
+
+ stream = FeishuTaskStream(
+ "open-1",
+ "open_id",
+ send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append((payload, msg_type)) or "msg",
+ patch_card=lambda *_: True,
+ display_text=_display_text,
+ )
+
+ stream.done("0.7 是个好数值,保持这个值挺好。")
+
+ assert sent[0][1] == "text"
+ assert json.loads(sent[0][0])["text"] == "0.7 是个好数值,保持这个值挺好。"
+
+
+def test_structured_private_reply_uses_post():
+ sent = []
+
+ stream = FeishuTaskStream(
+ "open-1",
+ "open_id",
+ send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append((payload, msg_type)) or "msg",
+ patch_card=lambda *_: True,
+ display_text=_display_text,
+ )
+
+ stream.done("结论:已修复。\n\n1. 更新输出流\n2. 重启验证")
+
+ assert sent[0][1] == "post"
+ assert "更新输出流" in _payload_text(sent[0][0])
+
+
+def test_plain_final_classifier_keeps_reports_as_cards():
+ assert should_send_plain_final("我在,直接说。")
+ assert not should_send_plain_final("结论:已处理")
+ assert not should_send_plain_final("1. 先查日志\n2. 再重启")
+
+
+def test_final_card_surfaces_task_titles_from_output():
+ payload = build_final_card(
+ "\n".join([
+ "🎉 任务 1 & 2 全部完成!",
+ "",
+ "✅ 任务 1:盘中监控预警脚本",
+ "已创建。",
+ "",
+ "✅ 任务 2:K 线图可视化",
+ "已生成。",
+ ]),
+ title="已完成",
+ )
+
+ text = _payload_text(payload)
+ assert "完成清单" in text
+ assert "任务 1:盘中监控预警脚本" in text
+ assert "任务 2:K 线图可视化" in text
+
+
+def test_workspace_card_surfaces_task_plan_from_steps_and_outputs():
+ sent = []
+ patched = []
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=lambda receive_id, payload, msg_type, rid_type: sent.append(payload) or "msg",
+ patch_card=lambda message_id, payload: patched.append(payload) or True,
+ display_text=_display_text,
+ )
+
+ stream.step(
+ "生成盘中监控预警脚本",
+ build_step_detail(
+ _Resp(),
+ [{"tool_name": "file_write", "args": {"path": "/tmp/scanner_ma5_monitor.py"}}],
+ _display_text,
+ tool_results=[{"status": "success", "output": "已写入 scanner_ma5_monitor.py"}],
+ ),
+ )
+ stream.step(
+ "绘制 K 线图",
+ build_step_detail(
+ _Resp(),
+ [{"tool_name": "code_run", "args": {"script": "python draw_kline.py"}}],
+ _display_text,
+ tool_results=[{"status": "success", "output": "已生成 kline_001259.png"}],
+ ),
+ )
+ stream.done("任务 1:盘中监控预警脚本\n任务 2:K 线图可视化\n\n全部完成。")
+
+ text = _payload_text(patched[-1])
+ assert "完成清单" in text
+ assert "任务 1:盘中监控预警脚本" in text
+ assert "任务 2:K 线图可视化" in text
+ assert "执行动作" in text
+ assert "产物/证据" in text
+ assert "scanner_ma5_monitor.py" in text
+
+
+def test_natural_group_final_strips_machine_heading():
+ assert natural_group_final("**结论**\n\n搞定了") == "搞定了"
+ assert natural_group_final("✅ 最终结论:已处理") == "已处理"
+
+
+def test_group_compact_delays_workspace_until_second_step():
+ replies = []
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=lambda *args: "raw",
+ patch_card=lambda *_: True,
+ display_text=_display_text,
+ group_compact=True,
+ reply_to="om_123",
+ send_reply=lambda message_id, payload, msg_type: replies.append((message_id, payload, msg_type)) or "reply",
+ )
+
+ stream.step("刚开始处理", "detail")
+ assert replies == []
+ stream.step("继续验证", "detail")
+
+ assert replies
+ assert "第 1 轮 · 刚开始处理" in _payload_text(replies[0][1])
+ assert "第 2 轮 · 继续验证" in _payload_text(replies[0][1])
+
+
+def test_group_compact_cancel_sends_plain_stop_message():
+ replies = []
+
+ stream = FeishuTaskStream(
+ "chat-1",
+ "chat_id",
+ send_raw=lambda *args: "raw",
+ patch_card=lambda *_: True,
+ display_text=_display_text,
+ group_compact=True,
+ reply_to="om_123",
+ send_reply=lambda message_id, payload, msg_type: replies.append((message_id, payload, msg_type)) or "reply",
+ )
+
+ stream.cancel("已停止当前任务")
+
+ assert replies == [("om_123", json.dumps({"text": "已停止当前任务"}, ensure_ascii=False), "text")]