Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 179 additions & 102 deletions astrbot/core/skills/skill_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def _normalize_skill_name(name: str | None) -> str:
return re.sub(r"\s+", "_", raw.strip())


_SKILL_MARKDOWN_FILENAMES = {"SKILL.md", "skill.md"}


def _default_sandbox_skill_path(name: str) -> str:
return f"{SANDBOX_WORKSPACE_ROOT}/{SANDBOX_SKILLS_ROOT}/{name}/SKILL.md"

Expand Down Expand Up @@ -64,6 +67,30 @@ def _is_ignored_zip_entry(name: str) -> bool:
return parts[0] == "__MACOSX"


def _normalize_zip_entry_path(name: str) -> PurePosixPath | None:
normalized = str(name or "").replace("\\", "/")
if not normalized:
return None
if normalized.startswith("/") or re.match(r"^[A-Za-z]:", normalized):
raise ValueError("Zip archive contains absolute paths.")

parts: list[str] = []
for part in PurePosixPath(normalized).parts:
if part in {"", "."}:
continue
if part == "..":
raise ValueError("Zip archive contains invalid relative paths.")
parts.append(part)

if not parts:
return None

path = PurePosixPath(*parts)
if _is_ignored_zip_entry(str(path)):
return None
return path


def _normalize_skill_markdown_path(skill_dir: Path) -> Path | None:
"""Return the canonical `SKILL.md` path for a skill directory.

Expand All @@ -87,6 +114,39 @@ def _normalize_skill_markdown_path(skill_dir: Path) -> Path | None:
return canonical


def _parse_skill_frontmatter(text: str) -> dict:
if not text.startswith("---"):
return {}
lines = text.splitlines()
if not lines or lines[0].strip() != "---":
return {}

end_idx = None
for i in range(1, len(lines)):
if lines[i].strip() == "---":
end_idx = i
break
if end_idx is None:
return {}

frontmatter = "\n".join(lines[1:end_idx])
try:
payload = yaml.safe_load(frontmatter) or {}
except yaml.YAMLError:
return {}
if not isinstance(payload, dict):
return {}
return payload


def _parse_frontmatter_name(text: str) -> str:
payload = _parse_skill_frontmatter(text)
name = payload.get("name", "")
if not isinstance(name, str):
return ""
return name.strip()


@dataclass
class SkillInfo:
name: str
Expand All @@ -110,33 +170,94 @@ def _parse_frontmatter_description(text: str) -> str:
description: What this skill does and when to use it.
---
"""
if not text.startswith("---"):
return ""
lines = text.splitlines()
if not lines or lines[0].strip() != "---":
return ""
end_idx = None
for i in range(1, len(lines)):
if lines[i].strip() == "---":
end_idx = i
break
if end_idx is None:
return ""

frontmatter = "\n".join(lines[1:end_idx])
try:
payload = yaml.safe_load(frontmatter) or {}
except yaml.YAMLError:
return ""
if not isinstance(payload, dict):
return ""

payload = _parse_skill_frontmatter(text)
description = payload.get("description", "")
if not isinstance(description, str):
return ""
return description.strip()


def _path_is_ancestor_or_same(ancestor: PurePosixPath, path: PurePosixPath) -> bool:
ancestor_parts = ancestor.parts
path_parts = path.parts
if len(ancestor_parts) > len(path_parts):
return False
return path_parts[: len(ancestor_parts)] == ancestor_parts


def _discover_skill_root_from_archive(
file_paths: list[PurePosixPath],
) -> PurePosixPath:
candidate_dirs = {
path.parent for path in file_paths if path.name in _SKILL_MARKDOWN_FILENAMES
}
if not candidate_dirs:
raise ValueError("SKILL.md not found in the zip archive.")

ordered_candidates = sorted(
candidate_dirs, key=lambda path: (len(path.parts), str(path))
)
shallowest_depth = len(ordered_candidates[0].parts)
shallowest_candidates = [
path for path in ordered_candidates if len(path.parts) == shallowest_depth
]
if len(shallowest_candidates) != 1:
raise ValueError(
"Multiple skill roots found in the zip archive. Please upload one skill per zip."
)

skill_root = shallowest_candidates[0]
for candidate in ordered_candidates[1:]:
if not _path_is_ancestor_or_same(skill_root, candidate):
raise ValueError(
"Multiple skill roots found in the zip archive. Please upload one skill per zip."
)
return skill_root
Comment on lines +188 to +215
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_discover_skill_root_from_archive() returns path.parent for SKILL.md entries, which for a root-level SKILL.md is typically PurePosixPath('.'). The subsequent logic treats .parts as indicating “has a directory”, which can make root-level archives behave like nested ones (e.g., the if not skill_root_rel.parts / if skill_root_rel.parts branches become misleading or unreachable), and it can also make the ancestor check treat the root as not an ancestor of nested candidates. Consider normalizing a root-level skill root to an explicit sentinel (e.g., empty tuple / None) or special-casing PurePosixPath('.') so root-depth is 0 and ancestor checks work as intended.

Copilot uses AI. Check for mistakes.


def _infer_skill_name(
skill_root_rel: PurePosixPath,
skill_md_path: Path,
zip_path: Path,
skill_name_hint: str | None = None,
) -> str:
candidates: list[str] = []
if skill_root_rel.parts:
candidates.append(skill_root_rel.name)

try:
skill_text = skill_md_path.read_text(encoding="utf-8")
except OSError:
skill_text = ""

frontmatter_name = _parse_frontmatter_name(skill_text)
if frontmatter_name:
candidates.append(frontmatter_name)

if skill_name_hint:
candidates.append(skill_name_hint)

zip_stem = zip_path.stem.strip()
if zip_stem:
candidates.append(zip_stem)

for candidate in candidates:
normalized_candidate = _normalize_skill_name(candidate)
if normalized_candidate and _SKILL_NAME_RE.fullmatch(normalized_candidate):
return normalized_candidate

sanitized_zip_stem = _normalize_skill_name(
re.sub(r"[^\w.-]+", "-", zip_path.stem).strip("._-")
)
if sanitized_zip_stem and _SKILL_NAME_RE.fullmatch(sanitized_zip_stem):
return sanitized_zip_stem

raise ValueError(
"Unable to determine a valid skill name. Add a valid `name` to SKILL.md "
"or package the skill inside a valid folder."
)


# Regex for sanitizing paths used in prompt examples — only allow
# safe path characters to prevent prompt injection via crafted skill paths.
_SAFE_PATH_RE = re.compile(r"[^\w./ ,()'\-]", re.UNICODE)
Expand Down Expand Up @@ -549,102 +670,58 @@ def install_skill_from_zip(
raise ValueError("Uploaded file is not a valid zip archive.")

with zipfile.ZipFile(zip_path) as zf:
names = [
name
for name in (entry.replace("\\", "/") for entry in zf.namelist())
if name and not _is_ignored_zip_entry(name)
]
file_names = [name for name in names if name and not name.endswith("/")]
if not file_names:
raise ValueError("Zip archive is empty.")

has_root_skill_md = any(
len(parts := PurePosixPath(name).parts) == 1
and parts[0] in {"SKILL.md", "skill.md"}
for name in file_names
)
root_mode = has_root_skill_md
validated_members: list[tuple[zipfile.ZipInfo, PurePosixPath]] = []
file_paths: list[PurePosixPath] = []

archive_skill_name = None
if skill_name_hint is not None:
archive_skill_name = _normalize_skill_name(skill_name_hint)
if archive_skill_name and not _SKILL_NAME_RE.fullmatch(
archive_skill_name
):
raise ValueError("Invalid skill name.")
for member in zf.infolist():
normalized_path = _normalize_zip_entry_path(member.filename)
if normalized_path is None:
continue
validated_members.append((member, normalized_path))
if not member.is_dir():
file_paths.append(normalized_path)
Comment on lines +673 to +682
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Extraction still uses the raw zip filenames rather than the normalized paths, which may diverge from the validated structure.

You compute a normalized path for each ZipInfo but then extract using member.filename, so the actual directory structure comes from the raw names, not the validated paths. To avoid discrepancies (especially with odd path forms), either extract using the normalized paths (e.g., manually writing files) or add a check that Path(member.filename) under tmp_dir resolves to the same location as normalized_path.

Suggested implementation:

            validated_members: list[tuple[zipfile.ZipInfo, PurePosixPath]] = []
            file_paths: list[PurePosixPath] = []
            normalized_paths: list[PurePosixPath] = []

            for member in zf.infolist():

To fully implement the suggestion, you should adjust the code inside and after the for member in zf.infolist(): loop as follows (exact code will depend on the current implementation):

  1. Inside the loop (where you already compute the normalized path):

    • Assuming you already compute something like:
      normalized_path = PurePosixPath(<some_normalization_logic_using_member.filename>)
    • Append it to normalized_paths:
      normalized_paths.append(normalized_path)
    • Before appending to validated_members, compute the resolved targets under tmp_dir (or whatever temporary root you use):
      raw_target = (tmp_dir / member.filename).resolve()
      normalized_target = (tmp_dir / normalized_path).resolve()
      if raw_target != normalized_target:
          raise ValueError(
              f"Zip member {member.filename!r} resolves to {raw_target} "
              f"which does not match validated normalized path {normalized_target}"
          )
    • Only after this check, append to validated_members:
      validated_members.append((member, normalized_path))
  2. Where you perform extraction:

    • If you currently do:
      for member in zf.infolist():
          zf.extract(member, path=tmp_dir)
      change it to iterate over validated_members and use the normalized path:
      for member, normalized_path in validated_members:
          target_path = (tmp_dir / normalized_path).resolve()
          target_path.parent.mkdir(parents=True, exist_ok=True)
          with zf.open(member, "r") as src, target_path.open("wb") as dst:
              shutil.copyfileobj(src, dst)
    • This ensures the actual filesystem layout derives from the validated normalized paths, not the raw zip filenames.
  3. Imports:

    • Ensure the file has:
      from pathlib import Path, PurePosixPath
      import shutil
      (or re-use existing imports if they are already present).

These changes will guarantee that either:

  • Extraction strictly follows the normalized/validated structure, or
  • A mismatch between the raw filename and normalized path is detected and rejected before extraction.


if root_mode:
archive_hint = _normalize_skill_name(
archive_skill_name or zip_path_obj.stem
)
if not archive_hint or not _SKILL_NAME_RE.fullmatch(archive_hint):
raise ValueError("Invalid skill name.")
skill_name = archive_hint
else:
top_dirs = {
PurePosixPath(name).parts[0] for name in file_names if name.strip()
}
if len(top_dirs) != 1:
raise ValueError(
"Zip archive must contain a single top-level folder."
)
archive_root_name = next(iter(top_dirs))
archive_root_name_normalized = _normalize_skill_name(archive_root_name)
if archive_root_name in {".", "..", ""} or not _SKILL_NAME_RE.fullmatch(
archive_root_name_normalized
):
raise ValueError("Invalid skill folder name.")
if archive_skill_name:
if not _SKILL_NAME_RE.fullmatch(archive_skill_name):
raise ValueError("Invalid skill name.")
skill_name = archive_skill_name
else:
skill_name = archive_root_name_normalized
if not file_paths:
raise ValueError("Zip archive is empty.")

for name in names:
if not name:
continue
if name.startswith("/") or re.match(r"^[A-Za-z]:", name):
raise ValueError("Zip archive contains absolute paths.")
parts = PurePosixPath(name).parts
if ".." in parts:
raise ValueError("Zip archive contains invalid relative paths.")
if (not root_mode) and parts and parts[0] != archive_root_name:
raise ValueError(
"Zip archive contains unexpected top-level entries."
)

if root_mode:
if "SKILL.md" not in file_names and "skill.md" not in file_names:
raise ValueError("SKILL.md not found in the skill folder.")
else:
if (
f"{archive_root_name}/SKILL.md" not in file_names
and f"{archive_root_name}/skill.md" not in file_names
):
raise ValueError("SKILL.md not found in the skill folder.")
skill_root_rel = _discover_skill_root_from_archive(file_paths)

with tempfile.TemporaryDirectory(dir=get_astrbot_temp_path()) as tmp_dir:
for member in zf.infolist():
member_name = member.filename.replace("\\", "/")
if not member_name or _is_ignored_zip_entry(member_name):
continue
extraction_root = Path(tmp_dir)
for member, _normalized_path in validated_members:
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

install_skill_from_zip() computes skill_root_rel before extraction, but still extracts all validated zip members into the temp directory, including files outside the detected skill root. This increases disk usage and exposure to zip-bomb style uploads, and it’s unnecessary since you already track each member’s normalized path. Consider filtering validated_members to only those under skill_root_rel (plus any required parent dirs) or rejecting archives that contain entries outside the chosen root to keep validation strict and avoid extracting unrelated content.

Suggested change
for member, _normalized_path in validated_members:
for member, normalized_path in validated_members:
# Only extract entries that belong to the detected skill root.
# If skill_root_rel has no parts, the whole archive is treated as the root.
if skill_root_rel.parts and not normalized_path.is_relative_to(
skill_root_rel
):
continue

Copilot uses AI. Check for mistakes.
zf.extract(member, tmp_dir)

src_dir = (
Path(tmp_dir) if root_mode else Path(tmp_dir) / archive_root_name
extraction_root
if not skill_root_rel.parts
else extraction_root.joinpath(*skill_root_rel.parts)
)
normalized_path = _normalize_skill_markdown_path(src_dir)
if normalized_path is None:
raise ValueError("SKILL.md not found in the skill folder.")
_normalize_skill_markdown_path(src_dir)
skill_md_path = _normalize_skill_markdown_path(src_dir)
if not src_dir.exists():
raise ValueError("Skill folder not found after extraction.")
if skill_md_path is None:
raise ValueError("SKILL.md not found in the skill package.")

skill_name = _infer_skill_name(
skill_root_rel,
skill_md_path,
zip_path_obj,
skill_name_hint=skill_name_hint,
)
dest_dir = Path(self.skills_root) / skill_name
if dest_dir.exists():
if not overwrite:
raise FileExistsError("Skill already exists.")
shutil.rmtree(dest_dir)
shutil.move(str(src_dir), str(dest_dir))

if skill_root_rel.parts:
shutil.move(str(src_dir), str(dest_dir))
else:
dest_dir.mkdir(parents=True, exist_ok=False)
for child in extraction_root.iterdir():
if child.name == "__MACOSX":
continue
shutil.move(str(child), str(dest_dir / child.name))

self.set_skill_active(skill_name, True)
return skill_name
4 changes: 2 additions & 2 deletions dashboard/src/i18n/locales/en-US/features/extension.json
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@
"empty": "No Skills found",
"emptyHint": "Upload a Skills zip to get started",
"uploadDialogTitle": "Upload Skills",
"uploadHint": "Upload multiple zip skill packages or drag them in. The system validates the structure automatically and shows a result for each file.",
"structureRequirement": "The archive supports multiple skills folders.",
"uploadHint": "Upload multiple zip skill packages or drag them in. The system detects the skill root automatically, validates the structure, and shows a result for each file.",
"structureRequirement": "The archive may either place the skill contents at the zip root or put `SKILL.md` inside a nested skill directory. The system will locate that root automatically, but each zip must still contain only one skill.",
"abilityMultiple": "Upload multiple zip files at once",
"abilityValidate": "Validate `SKILL.md` automatically",
"abilitySkip": "Automatically skip duplicate files.",
Expand Down
4 changes: 2 additions & 2 deletions dashboard/src/i18n/locales/ru-RU/features/extension.json
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@
"empty": "Навыки не найдены",
"emptyHint": "Пожалуйста, загрузите архив с навыками",
"uploadDialogTitle": "Загрузка навыков",
"uploadHint": "Поддерживается массовая загрузка zip-архивов. Вы также можете перетащить файлы в это окно. Система автоматически проверит структуру каждого архива.",
"structureRequirement": "Поддерживаются архивы с несколькими папками skills.",
"uploadHint": "Поддерживается массовая загрузка zip-архивов. Вы также можете перетащить файлы в это окно. Система автоматически определит корень skill, проверит структуру и покажет результат для каждого файла.",
"structureRequirement": "Архив может содержать содержимое skill прямо в корне zip или размещать `SKILL.md` во вложенной директории skill. Система найдёт корень автоматически, но один zip по-прежнему должен содержать только один skill.",
"abilityMultiple": "Поддержка массовой загрузки",
"abilityValidate": "Автопроверка `SKILL.md`",
"abilitySkip": "Пропуск дубликатов",
Expand Down
4 changes: 2 additions & 2 deletions dashboard/src/i18n/locales/zh-CN/features/extension.json
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@
"empty": "暂无 Skills",
"emptyHint": "请上传 Skills 压缩包",
"uploadDialogTitle": "上传 Skills",
"uploadHint": "支持批量上传 zip 技能包,也支持拖拽批量上传 zip 技能包。系统会自动校验目录结构,并给出逐个文件的结果。",
"structureRequirement": "支持压缩包内含多个 skills 文件夹。",
"uploadHint": "支持批量上传 zip 技能包,也支持拖拽批量上传 zip 技能包。系统会自动识别 skill 根目录、校验结构,并给出逐个文件的结果。",
"structureRequirement": "压缩包可以直接以 skill 内容为根目录,也可以把 `SKILL.md` 放在某个子目录中。系统会自动定位 skill 根目录,但一个 zip 仍然只能包含一个 skill。",
"abilityMultiple": "支持一次上传多个zip文件",
"abilityValidate": "自动校验 `SKILL.md`",
"abilitySkip": "自动跳过重复文件",
Expand Down
Loading
Loading