Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 64 additions & 35 deletions astrbot/dashboard/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,61 @@ def _init_jwt_secret(self) -> None:
logger.info("Initialized random JWT secret for dashboard.")
self._jwt_secret = self.config["dashboard"]["jwt_secret"]

@staticmethod
def _resolve_dashboard_ssl_config(
ssl_config: dict,
) -> tuple[bool, dict[str, str]]:
cert_file = (
os.environ.get("DASHBOARD_SSL_CERT")
or os.environ.get("ASTRBOT_DASHBOARD_SSL_CERT")
or ssl_config.get("cert_file", "")
)
key_file = (
Comment on lines +303 to +312
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 question (security): Changing SSL misconfiguration from hard failure to warning+HTTP fallback significantly alters security posture.

Previously, missing/invalid cert_file, key_file, or ca_certs raised ValueError and stopped startup; now _resolve_dashboard_ssl_config only logs a warning and returns (False, {}), allowing the server to run over HTTP instead. This can silently downgrade a deployment that expects TLS to be mandatory. Consider adding an explicit mode (e.g., require_ssl flag or env var) that preserves the old fail-fast behavior for environments where TLS must not be bypassed.

os.environ.get("DASHBOARD_SSL_KEY")
or os.environ.get("ASTRBOT_DASHBOARD_SSL_KEY")
or ssl_config.get("key_file", "")
)
ca_certs = (
os.environ.get("DASHBOARD_SSL_CA_CERTS")
or os.environ.get("ASTRBOT_DASHBOARD_SSL_CA_CERTS")
or ssl_config.get("ca_certs", "")
)

if not cert_file or not key_file:
logger.warning(
"dashboard.ssl.enable 已启用,但未同时配置 cert_file 和 key_file,SSL 配置将不会生效。",
)
return False, {}

cert_path = Path(cert_file).expanduser()
key_path = Path(key_file).expanduser()
if not cert_path.is_file():
logger.warning(
f"dashboard.ssl.enable 已启用,但 SSL 证书文件不存在: {cert_path},SSL 配置将不会生效。",
)
return False, {}
if not key_path.is_file():
logger.warning(
f"dashboard.ssl.enable 已启用,但 SSL 私钥文件不存在: {key_path},SSL 配置将不会生效。",
)
return False, {}

resolved_ssl_config = {
"certfile": str(cert_path.resolve()),
"keyfile": str(key_path.resolve()),
}

if ca_certs:
ca_path = Path(ca_certs).expanduser()
if not ca_path.is_file():
logger.warning(
f"dashboard.ssl.enable 已启用,但 SSL CA 证书文件不存在: {ca_path},SSL 配置将不会生效。",
)
return False, {}
resolved_ssl_config["ca_certs"] = str(ca_path.resolve())

return True, resolved_ssl_config
Comment on lines +304 to +356
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The method _resolve_dashboard_ssl_config contains some repetitive logic for validating file paths. This can be refactored to improve readability and maintainability by extracting the validation logic into a nested helper function.

    def _resolve_dashboard_ssl_config(
        ssl_config: dict,
    ) -> tuple[bool, dict[str, str]]:
        def _validate_path(path_str: str, description: str) -> Path | None:
            if not path_str:
                return None
            path = Path(path_str).expanduser()
            if not path.is_file():
                logger.warning(
                    f"dashboard.ssl.enable 已启用,但 SSL {description} 不存在: {path},SSL 配置将不会生效。"
                )
                return None
            return path

        cert_file = (
            os.environ.get("DASHBOARD_SSL_CERT")
            or os.environ.get("ASTRBOT_DASHBOARD_SSL_CERT")
            or ssl_config.get("cert_file", "")
        )
        key_file = (
            os.environ.get("DASHBOARD_SSL_KEY")
            or os.environ.get("ASTRBOT_DASHBOARD_SSL_KEY")
            or ssl_config.get("key_file", "")
        )

        if not cert_file or not key_file:
            logger.warning(
                "dashboard.ssl.enable 已启用,但未同时配置 cert_file 和 key_file,SSL 配置将不会生效。"
            )
            return False, {}

        cert_path = _validate_path(cert_file, "证书文件")
        key_path = _validate_path(key_file, "私钥文件")

        if not cert_path or not key_path:
            return False, {}

        resolved_ssl_config = {
            "certfile": str(cert_path.resolve()),
            "keyfile": str(key_path.resolve()),
        }

        ca_certs = (
            os.environ.get("DASHBOARD_SSL_CA_CERTS")
            or os.environ.get("ASTRBOT_DASHBOARD_SSL_CA_CERTS")
            or ssl_config.get("ca_certs", "")
        )
        if ca_certs:
            ca_path = _validate_path(ca_certs, "CA 证书文件")
            if not ca_path:
                return False, {}
            resolved_ssl_config["ca_certs"] = str(ca_path.resolve())

        return True, resolved_ssl_config


def run(self):
ip_addr = []
dashboard_config = self.core_lifecycle.astrbot_config.get("dashboard", {})
Expand All @@ -322,6 +377,11 @@ def run(self):
or os.environ.get("ASTRBOT_DASHBOARD_SSL_ENABLE"),
bool(ssl_config.get("enable", False)),
)
resolved_ssl_config: dict[str, str] = {}
if ssl_enable:
ssl_enable, resolved_ssl_config = self._resolve_dashboard_ssl_config(
ssl_config,
)
scheme = "https" if ssl_enable else "http"

if not enable:
Expand Down Expand Up @@ -373,41 +433,10 @@ def run(self):
config = HyperConfig()
config.bind = [f"{host}:{port}"]
if ssl_enable:
cert_file = (
os.environ.get("DASHBOARD_SSL_CERT")
or os.environ.get("ASTRBOT_DASHBOARD_SSL_CERT")
or ssl_config.get("cert_file", "")
)
key_file = (
os.environ.get("DASHBOARD_SSL_KEY")
or os.environ.get("ASTRBOT_DASHBOARD_SSL_KEY")
or ssl_config.get("key_file", "")
)
ca_certs = (
os.environ.get("DASHBOARD_SSL_CA_CERTS")
or os.environ.get("ASTRBOT_DASHBOARD_SSL_CA_CERTS")
or ssl_config.get("ca_certs", "")
)

cert_path = Path(cert_file).expanduser()
key_path = Path(key_file).expanduser()
if not cert_file or not key_file:
raise ValueError(
"dashboard.ssl.enable 为 true 时,必须配置 cert_file 和 key_file。",
)
if not cert_path.is_file():
raise ValueError(f"SSL 证书文件不存在: {cert_path}")
if not key_path.is_file():
raise ValueError(f"SSL 私钥文件不存在: {key_path}")

config.certfile = str(cert_path.resolve())
config.keyfile = str(key_path.resolve())

if ca_certs:
ca_path = Path(ca_certs).expanduser()
if not ca_path.is_file():
raise ValueError(f"SSL CA 证书文件不存在: {ca_path}")
config.ca_certs = str(ca_path.resolve())
config.certfile = resolved_ssl_config["certfile"]
config.keyfile = resolved_ssl_config["keyfile"]
if "ca_certs" in resolved_ssl_config:
config.ca_certs = resolved_ssl_config["ca_certs"]

# 根据配置决定是否禁用访问日志
disable_access_log = dashboard_config.get("disable_access_log", True)
Expand Down
45 changes: 45 additions & 0 deletions tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,51 @@ async def test_get_stat(app: Quart, authenticated_header: dict):
assert data["status"] == "ok" and "platform" in data["data"]


@pytest.mark.asyncio
async def test_dashboard_ssl_missing_cert_and_key_falls_back_to_http(
core_lifecycle_td: AstrBotCoreLifecycle,
monkeypatch,
):
shutdown_event = asyncio.Event()
server = AstrBotDashboard(core_lifecycle_td, core_lifecycle_td.db, shutdown_event)
original_dashboard_config = copy.deepcopy(
core_lifecycle_td.astrbot_config.get("dashboard", {}),
)
warning_messages = []
Comment on lines +112 to +121
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (testing): This test can be flaky/misleading if SSL-related environment variables are set in the test environment.

_resolve_dashboard_ssl_config prefers environment variables (DASHBOARD_SSL_CERT / ASTRBOT_DASHBOARD_SSL_CERT, DASHBOARD_SSL_KEY / ASTRBOT_DASHBOARD_SSL_KEY) over the config values used in this test. Since the test doesn’t clear these env vars, a CI or local shell setting could cause SSL to be enabled and certfile/keyfile to be non-None, breaking the assertions. Please explicitly control these env vars in the test (e.g. with monkeypatch.delenv(..., raising=False) or monkeypatch.setenv(...)) so the test is isolated from the external environment.

info_messages = []

async def fake_serve(app, config, shutdown_trigger):
return config

try:
core_lifecycle_td.astrbot_config["dashboard"]["ssl"] = {
"enable": True,
"cert_file": "",
"key_file": "",
}
monkeypatch.setattr(server, "check_port_in_use", lambda port: False)
monkeypatch.setattr("astrbot.dashboard.server.serve", fake_serve)
monkeypatch.setattr(
"astrbot.dashboard.server.logger.warning",
lambda message: warning_messages.append(message),
)
monkeypatch.setattr(
"astrbot.dashboard.server.logger.info",
lambda message: info_messages.append(message),
)

config = await server.run()

assert getattr(config, "certfile", None) is None
assert getattr(config, "keyfile", None) is None
assert any("cert_file 和 key_file" in message for message in warning_messages)
assert any(
"正在启动 WebUI, 监听地址: http://" in message for message in info_messages
)
finally:
core_lifecycle_td.astrbot_config["dashboard"] = original_dashboard_config


@pytest.mark.asyncio
async def test_subagent_config_accepts_default_persona(
app: Quart,
Expand Down
Loading