From c0dab644a47400f3465fab1955f9b06bb331f290 Mon Sep 17 00:00:00 2001 From: Simo Aleksandrov Date: Mon, 18 May 2026 00:44:56 +0300 Subject: [PATCH 1/2] feat(extensions): add Northflank sandbox, tools, and shell MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NorthflankSandboxClient + NorthflankSandboxSession (BaseSandboxClient / BaseSandboxSession) run agent workspaces inside a Northflank service container. Two modes: * attach to an existing service (service_id) — client never deletes it. * ephemeral deployment (image_path) — client creates the deployment and deletes it on cleanup. owned_by_client is recorded in session state so resume() preserves the ownership decision. Exec goes through the V1 exec WebSocket; file IO uses the SDK's file-copy API; workspace persistence is tar through /tmp staging. Tar payloads are validated locally before extraction because tar -xf runs remotely and doesn't apply Python's filter='data' policy. NorthflankCtx + northflank_tools() expose curated @function_tool wrappers around the Northflank REST API (list/get projects + services, deploy + restart + pause + resume + scale + wait, exec in service/job/addon containers, fetch/tail logs, fetch metrics, plus gated delete/secret/volume/domain mutations). Mutating tools carry needs_approval=True. NorthflankShellExecutor wires the same exec channel into the ShellTool executor interface for agents that don't need a full sandbox. Adds the northflank optional extra (Python <3.14 since the Northflank Python SDK targets 3.9-3.13) and the mypy module override. --- pyproject.toml | 5 + src/agents/extensions/northflank/__init__.py | 74 + src/agents/extensions/northflank/_helpers.py | 54 + src/agents/extensions/northflank/context.py | 34 + src/agents/extensions/northflank/shell.py | 151 ++ src/agents/extensions/northflank/tools.py | 722 +++++++++ src/agents/extensions/sandbox/__init__.py | 22 + .../extensions/sandbox/northflank/__init__.py | 15 + .../extensions/sandbox/northflank/sandbox.py | 1221 +++++++++++++++ .../extensions/northflank/test_tools_shell.py | 212 +++ tests/extensions/sandbox/test_northflank.py | 1376 +++++++++++++++++ 11 files changed, 3886 insertions(+) create mode 100644 src/agents/extensions/northflank/__init__.py create mode 100644 src/agents/extensions/northflank/_helpers.py create mode 100644 src/agents/extensions/northflank/context.py create mode 100644 src/agents/extensions/northflank/shell.py create mode 100644 src/agents/extensions/northflank/tools.py create mode 100644 src/agents/extensions/sandbox/northflank/__init__.py create mode 100644 src/agents/extensions/sandbox/northflank/sandbox.py create mode 100644 tests/extensions/northflank/test_tools_shell.py create mode 100644 tests/extensions/sandbox/test_northflank.py diff --git a/pyproject.toml b/pyproject.toml index f64090f8aa..c0049b0f5f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ e2b = ["e2b==2.20.0", "e2b-code-interpreter==2.4.1"] modal = ["modal==1.3.5"] runloop = ["runloop_api_client>=1.16.0,<2.0.0"] vercel = ["vercel>=0.5.6,<0.6"] +northflank = ["northflank>=1.0,<2; python_version < '3.14'"] s3 = ["boto3>=1.34"] temporal = [ "temporalio==1.26.0", @@ -164,6 +165,10 @@ ignore_missing_imports = true module = ["vercel", "vercel.*"] ignore_missing_imports = true +[[tool.mypy.overrides]] +module = ["northflank", "northflank.*"] +ignore_missing_imports = true + [tool.coverage.run] source = ["src/agents"] omit = [ diff --git a/src/agents/extensions/northflank/__init__.py b/src/agents/extensions/northflank/__init__.py new file mode 100644 index 0000000000..78fbc22ef5 --- /dev/null +++ b/src/agents/extensions/northflank/__init__.py @@ -0,0 +1,74 @@ +"""Northflank-platform extensions for the openai-agents SDK. + +Two surfaces live here, gated behind the optional ``northflank`` extra: + +* :func:`northflank_tools` — a curated ``tool_namespace()`` bundle of + typed ``@function_tool`` wrappers around the Northflank REST API + (list/get projects + services, deploy + restart + pause + resume + + scale + wait, one-shot exec in service/job/addon containers, fetch and + tail logs, fetch metrics, and gated delete/secret/volume/domain + mutations). +* :class:`NorthflankShellExecutor` — a :class:`agents.tool.ShellTool` + executor that runs each shell command remotely inside a Northflank + service container via the V1 exec WebSocket. + +Both share :class:`NorthflankCtx`, a typed dataclass that carries the +``AsyncApiClient`` plus optional default ``project_id`` / ``team_id`` for +the run. +""" + +from __future__ import annotations + +from .context import NorthflankCtx +from .shell import NorthflankShellExecutor +from .tools import ( + nf_create_deployment_service, + nf_delete_domain, + nf_delete_secret, + nf_delete_service, + nf_delete_volume, + nf_fetch_service_logs, + nf_get_project, + nf_get_service, + nf_get_service_metrics, + nf_list_projects, + nf_list_services, + nf_pause_service, + nf_put_secret, + nf_restart_service, + nf_resume_service, + nf_run_addon_command, + nf_run_job_command, + nf_run_service_command, + nf_scale_service, + nf_tail_service_logs, + nf_wait_service_ready, + northflank_tools, +) + +__all__ = [ + "NorthflankCtx", + "NorthflankShellExecutor", + "nf_create_deployment_service", + "nf_delete_domain", + "nf_delete_secret", + "nf_delete_service", + "nf_delete_volume", + "nf_fetch_service_logs", + "nf_get_project", + "nf_get_service", + "nf_get_service_metrics", + "nf_list_projects", + "nf_list_services", + "nf_pause_service", + "nf_put_secret", + "nf_restart_service", + "nf_resume_service", + "nf_run_addon_command", + "nf_run_job_command", + "nf_run_service_command", + "nf_scale_service", + "nf_tail_service_logs", + "nf_wait_service_ready", + "northflank_tools", +] diff --git a/src/agents/extensions/northflank/_helpers.py b/src/agents/extensions/northflank/_helpers.py new file mode 100644 index 0000000000..c18ac6d730 --- /dev/null +++ b/src/agents/extensions/northflank/_helpers.py @@ -0,0 +1,54 @@ +"""Internal helpers shared across the Northflank tool/shell modules.""" + +from __future__ import annotations + +from typing import Any, Literal + +from ...run_context import RunContextWrapper +from .context import NorthflankCtx + +ShellMode = Literal["none", "sh", "bash"] + + +def wrap_shell_command(command: str, shell: ShellMode) -> tuple[str | list[str], Literal["none"]]: + """Translate a one-line shell command into an SDK exec invocation. + + The Northflank exec proxy forwards the SDK's ``shell`` field verbatim + and only recognises ``"none"`` for direct exec. So we always set + ``shell="none"`` on the SDK side and, when a shell is requested, wrap + the command in an explicit ``[shell, "-lc", command]`` argv ourselves. + """ + if shell == "none": + return command, "none" + return [shell, "-lc", command], "none" + + +def resolve_project_id(ctx: RunContextWrapper[NorthflankCtx], project_id: str | None) -> str: + """Resolve ``project_id`` from the call args, falling back to the context. + + Raises a clear error if neither is set so the model gets actionable + feedback instead of a 404 from the API. + """ + resolved = project_id or ctx.context.project_id + if not resolved: + raise ValueError( + "project_id is required: pass it explicitly or set NorthflankCtx.project_id." + ) + return resolved + + +def resolve_team_id(ctx: RunContextWrapper[NorthflankCtx], team_id: str | None) -> str | None: + """Resolve ``team_id`` with context fallback. ``None`` is valid (uses the + user-scoped routes).""" + return team_id or ctx.context.team_id + + +def unwrap(response: Any) -> dict[str, Any]: + """Return the SDK response's ``.data`` payload or the raw value. + + Northflank ``ApiCallResponse.data`` is always a TypedDict shape; tools + surface it directly and the agents runtime takes care of JSON + stringification. + """ + payload = getattr(response, "data", response) + return payload if isinstance(payload, dict) else {"value": payload} diff --git a/src/agents/extensions/northflank/context.py b/src/agents/extensions/northflank/context.py new file mode 100644 index 0000000000..d070aee310 --- /dev/null +++ b/src/agents/extensions/northflank/context.py @@ -0,0 +1,34 @@ +"""Typed user context shared by every Northflank tool. + +Pass a :class:`NorthflankCtx` instance to ``Runner.run(context=...)``. Each +tool reads it via ``RunContextWrapper[NorthflankCtx].context``. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +try: + from northflank import AsyncApiClient +except ImportError as exc: # pragma: no cover - import path depends on optional extras + raise ImportError( + "Northflank tools require the optional `northflank` extra.\n" + "Install it with: pip install 'openai-agents[northflank]'" + ) from exc + + +@dataclass +class NorthflankCtx: + """Runtime context every Northflank tool expects. + + ``project_id`` is optional so the same context can drive read tools that + list across projects, but most mutating tools will raise if it is not + set (they accept it explicitly on the call too). + """ + + client: AsyncApiClient + project_id: str | None = None + team_id: str | None = None + + +__all__ = ["NorthflankCtx"] diff --git a/src/agents/extensions/northflank/shell.py b/src/agents/extensions/northflank/shell.py new file mode 100644 index 0000000000..5fa183d68c --- /dev/null +++ b/src/agents/extensions/northflank/shell.py @@ -0,0 +1,151 @@ +"""Northflank-backed implementation of the agents ShellTool executor. + +Plug into the agents SDK like so:: + + from agents import Agent, ShellTool + from agents.extensions.northflank import NorthflankShellExecutor, NorthflankCtx + + executor = NorthflankShellExecutor(service_id="my-svc", project_id="my-proj") + agent = Agent( + name="ops", + tools=[ShellTool(executor=executor, needs_approval=True)], + ) + await Runner.run(agent, "free -h", context=NorthflankCtx(client=client)) + +The executor pulls the ``AsyncApiClient`` (and any unset project/team) from +:class:`NorthflankCtx` on the run context, so a single executor instance can +be reused across runs that share the same target service. +""" + +from __future__ import annotations + +from ...tool import ( + ShellCallOutcome, + ShellCommandOutput, + ShellCommandRequest, + ShellResult, +) +from ._helpers import ShellMode, wrap_shell_command +from .context import NorthflankCtx + +try: + from northflank import AsyncApiClient +except ImportError as exc: # pragma: no cover - import path depends on optional extras + raise ImportError( + "Northflank shell executor requires the optional `northflank` extra.\n" + "Install it with: pip install 'openai-agents[northflank]'" + ) from exc + + +class NorthflankShellExecutor: + """ShellTool executor that runs each command inside a Northflank service. + + Construction args bind the executor to one target container. ``client``, + ``project_id`` and ``team_id`` fall back to the :class:`NorthflankCtx` on + the request if not provided at construction time. + """ + + def __init__( + self, + *, + service_id: str, + project_id: str | None = None, + team_id: str | None = None, + client: AsyncApiClient | None = None, + shell: ShellMode = "sh", + instance_name: str | None = None, + container_name: str | None = None, + encoding: str = "utf-8", + default_timeout_s: float = 60.0, + ) -> None: + self.service_id = service_id + self.project_id = project_id + self.team_id = team_id + self.client = client + self.shell = shell + self.instance_name = instance_name + self.container_name = container_name + self.encoding = encoding + self.default_timeout_s = default_timeout_s + + def _resolve(self, request: ShellCommandRequest) -> tuple[AsyncApiClient, str, str | None]: + ctx_obj = getattr(request.ctx_wrapper, "context", None) + client = self.client + project_id = self.project_id + team_id = self.team_id + + if isinstance(ctx_obj, NorthflankCtx): + client = client or ctx_obj.client + project_id = project_id or ctx_obj.project_id + team_id = team_id or ctx_obj.team_id + + if client is None: + raise RuntimeError( + "NorthflankShellExecutor needs an AsyncApiClient: pass one to " + "the constructor or provide a NorthflankCtx on the run." + ) + if not project_id: + raise RuntimeError( + "NorthflankShellExecutor needs project_id: pass it to the " + "constructor or set NorthflankCtx.project_id." + ) + return client, project_id, team_id + + async def __call__(self, request: ShellCommandRequest) -> ShellResult: + client, project_id, team_id = self._resolve(request) + action = request.data.action + timeout_s = action.timeout_ms / 1000.0 if action.timeout_ms else self.default_timeout_s + + outputs: list[ShellCommandOutput] = [] + for command in action.commands: + sdk_command, sdk_shell = wrap_shell_command(command, self.shell) + try: + result = await client.exec.arun_service_command( + project_id=project_id, + service_id=self.service_id, + command=sdk_command, + team_id=team_id, + shell=sdk_shell, + instance_name=self.instance_name, + container_name=self.container_name, + encoding=self.encoding, + timeout=timeout_s, + ) + except TimeoutError as exc: + outputs.append( + ShellCommandOutput( + command=command, + stdout="", + stderr=str(exc), + outcome=ShellCallOutcome(type="timeout", exit_code=None), + ) + ) + break + + outputs.append( + ShellCommandOutput( + command=command, + stdout=result.stdout, + stderr=result.stderr, + outcome=ShellCallOutcome(type="exit", exit_code=result.exit_code), + provider_data={ + "status": result.status, + "service_id": self.service_id, + "project_id": project_id, + }, + ) + ) + + if result.exit_code not in (0, None): + # Mirror local shell semantics: abort the batch on the first + # non-zero exit so the model sees a clean failure. + break + + return ShellResult( + output=outputs, + max_output_length=action.max_output_length, + provider_data={"service_id": self.service_id, "project_id": project_id}, + ) + + +__all__ = ["NorthflankShellExecutor"] diff --git a/src/agents/extensions/northflank/tools.py b/src/agents/extensions/northflank/tools.py new file mode 100644 index 0000000000..49c875ef1a --- /dev/null +++ b/src/agents/extensions/northflank/tools.py @@ -0,0 +1,722 @@ +"""Curated Northflank tools for the OpenAI Agents Python SDK. + +Each tool is a top-level ``FunctionTool`` produced by ``@function_tool``. +They share a single typed context (:class:`NorthflankCtx`) so the agent can +operate over an authenticated SDK client without re-passing credentials at +every call. + +Use :func:`northflank_tools` to get the curated list grouped under a tool +namespace. Pass ``include_secrets``, ``include_volumes``, ``include_domains``, +or ``include_delete`` to opt-in to gated mutations. +""" + +from __future__ import annotations + +import asyncio +from typing import Any, Literal + +from ...run_context import RunContextWrapper +from ...tool import FunctionTool, function_tool, tool_namespace +from ._helpers import ShellMode, resolve_project_id, resolve_team_id, unwrap, wrap_shell_command +from .context import NorthflankCtx + +# --------------------------------------------------------------------------- +# Read tools +# --------------------------------------------------------------------------- + + +@function_tool +async def nf_list_projects( + ctx: RunContextWrapper[NorthflankCtx], + team_id: str | None = None, +) -> dict[str, Any]: + """List every Northflank project visible to the configured API token. + + Walks pagination so the model sees the full set, not just the first + page. + + Args: + team_id: Optional team scope. Falls back to NorthflankCtx.team_id. + """ + response = await ctx.context.client.list.projects.all(team_id=resolve_team_id(ctx, team_id)) + return unwrap(response) + + +@function_tool +async def nf_get_project( + ctx: RunContextWrapper[NorthflankCtx], + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Fetch a single Northflank project by ID.""" + response = await ctx.context.client.get.project( + project_id=resolve_project_id(ctx, project_id), + team_id=resolve_team_id(ctx, team_id), + ) + return unwrap(response) + + +@function_tool +async def nf_list_services( + ctx: RunContextWrapper[NorthflankCtx], + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """List every service in a Northflank project (walks pagination).""" + response = await ctx.context.client.list.services.all( + project_id=resolve_project_id(ctx, project_id), + team_id=resolve_team_id(ctx, team_id), + ) + return unwrap(response) + + +@function_tool +async def nf_get_service( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Fetch a service's full configuration and current status.""" + response = await ctx.context.client.get.service( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + team_id=resolve_team_id(ctx, team_id), + ) + return unwrap(response) + + +# --------------------------------------------------------------------------- +# Deploy / lifecycle tools (mutating — needs_approval=True) +# --------------------------------------------------------------------------- + + +@function_tool(needs_approval=True, strict_mode=False) +async def nf_create_deployment_service( + ctx: RunContextWrapper[NorthflankCtx], + name: str, + image_path: str, + instances: int = 1, + deployment_plan: str = "nf-compute-20", + description: str | None = None, + ports: list[dict[str, Any]] | None = None, + runtime_environment: dict[str, str] | None = None, + image_credentials: str | None = None, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Create a deployment service from an external Docker image. + + Args: + name: New service name. + image_path: Fully-qualified image reference (e.g. ``nginx:1.27``). + instances: Initial replica count. + deployment_plan: Northflank compute plan ID (defaults to nf-compute-20). + description: Optional human-readable description. + ports: Optional list of port specs (each with internalPort, name, protocol). + runtime_environment: Optional environment variables (name -> value). + image_credentials: Optional registry credentials addon ID for private images. + """ + data: dict[str, Any] = { + "name": name, + "billing": {"deploymentPlan": deployment_plan}, + "deployment": { + "instances": instances, + "external": {"imagePath": image_path}, + }, + } + if image_credentials: + data["deployment"]["external"]["credentials"] = image_credentials + if description: + data["description"] = description + if ports: + data["ports"] = ports + if runtime_environment: + data["runtimeEnvironment"] = runtime_environment + + response = await ctx.context.client.create.service.deployment( + project_id=resolve_project_id(ctx, project_id), + team_id=resolve_team_id(ctx, team_id), + data=data, + ) + return unwrap(response) + + +@function_tool(needs_approval=True) +async def nf_restart_service( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Restart all running containers of a service.""" + response = await ctx.context.client.restart.service( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + team_id=resolve_team_id(ctx, team_id), + ) + return unwrap(response) + + +@function_tool(needs_approval=True) +async def nf_pause_service( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Pause a service (scale to zero, retain configuration).""" + response = await ctx.context.client.pause.service( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + team_id=resolve_team_id(ctx, team_id), + ) + return unwrap(response) + + +@function_tool(needs_approval=True) +async def nf_resume_service( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + instances: int | None = None, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Resume a paused service. + + Args: + instances: Optional target replica count on resume. + """ + data: dict[str, Any] = {} + if instances is not None: + data["instances"] = instances + response = await ctx.context.client.resume.service( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + team_id=resolve_team_id(ctx, team_id), + data=data, + ) + return unwrap(response) + + +@function_tool(needs_approval=True) +async def nf_scale_service( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + instances: int | None = None, + deployment_plan: str | None = None, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Change replica count or compute plan for a service.""" + data: dict[str, Any] = {} + if instances is not None: + data["instances"] = instances + if deployment_plan is not None: + data["deploymentPlan"] = deployment_plan + if not data: + raise ValueError("nf_scale_service requires instances or deployment_plan.") + response = await ctx.context.client.scale.service( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + team_id=resolve_team_id(ctx, team_id), + data=data, + ) + return unwrap(response) + + +@function_tool +async def nf_wait_service_ready( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + timeout_s: float = 300.0, + poll_interval_s: float = 3.0, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Poll a service until its deployment status is COMPLETED.""" + payload: dict[str, Any] = await ctx.context.client.helpers.wait_for_service_ready( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + team_id=resolve_team_id(ctx, team_id), + timeout_s=timeout_s, + poll_interval_s=poll_interval_s, + ) + return payload + + +# --------------------------------------------------------------------------- +# Exec tools (mutating — needs_approval=True) +# --------------------------------------------------------------------------- + + +def _exec_to_dict(result: Any) -> dict[str, Any]: + return { + "exit_code": result.exit_code, + "status": result.status, + "stdout": result.stdout, + "stderr": result.stderr, + "ok": result.ok, + "message": getattr(result, "message", ""), + } + + +@function_tool(needs_approval=True) +async def nf_run_service_command( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + command: str, + shell: ShellMode = "sh", + instance_name: str | None = None, + container_name: str | None = None, + timeout: float = 60.0, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Run a one-shot command inside a service container via Northflank exec. + + Args: + command: A single shell command line (e.g. "ls -la /tmp | head"). + shell: Shell to run the command in. ``sh`` / ``bash`` wrap the command + in ``[shell, "-lc", command]`` so pipes and redirection work. + Use ``none`` to skip the shell — in that case ``command`` is passed + to ``exec`` directly with the kernel splitting on whitespace, so + shell features will not be interpreted. + instance_name: Specific replica to target; otherwise the proxy picks one. + container_name: Specific sidecar container name, if the service has many. + timeout: Seconds to wait for completion. + """ + sdk_command, sdk_shell = wrap_shell_command(command, shell) + result = await ctx.context.client.exec.arun_service_command( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + command=sdk_command, + team_id=resolve_team_id(ctx, team_id), + shell=sdk_shell, + instance_name=instance_name, + container_name=container_name, + timeout=timeout, + ) + return _exec_to_dict(result) + + +@function_tool(needs_approval=True) +async def nf_run_job_command( + ctx: RunContextWrapper[NorthflankCtx], + job_id: str, + command: str, + shell: ShellMode = "sh", + instance_name: str | None = None, + container_name: str | None = None, + timeout: float = 60.0, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Run a one-shot command inside a running job container. + + See :func:`nf_run_service_command` for the ``shell`` semantics. + """ + sdk_command, sdk_shell = wrap_shell_command(command, shell) + result = await ctx.context.client.exec.arun_job_command( + project_id=resolve_project_id(ctx, project_id), + job_id=job_id, + command=sdk_command, + team_id=resolve_team_id(ctx, team_id), + shell=sdk_shell, + instance_name=instance_name, + container_name=container_name, + timeout=timeout, + ) + return _exec_to_dict(result) + + +@function_tool(needs_approval=True) +async def nf_run_addon_command( + ctx: RunContextWrapper[NorthflankCtx], + addon_id: str, + command: str, + shell: ShellMode = "sh", + instance_name: str | None = None, + container_name: str | None = None, + timeout: float = 60.0, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Run a one-shot command inside an addon (e.g. database) container. + + See :func:`nf_run_service_command` for the ``shell`` semantics. + """ + sdk_command, sdk_shell = wrap_shell_command(command, shell) + result = await ctx.context.client.exec.arun_addon_command( + project_id=resolve_project_id(ctx, project_id), + addon_id=addon_id, + command=sdk_command, + team_id=resolve_team_id(ctx, team_id), + shell=sdk_shell, + instance_name=instance_name, + container_name=container_name, + timeout=timeout, + ) + return _exec_to_dict(result) + + +# --------------------------------------------------------------------------- +# Logs tools +# --------------------------------------------------------------------------- + + +def _log_lines_to_list(lines: Any) -> list[dict[str, Any]]: + return [ + { + "ts": getattr(ln, "ts", ""), + "container_id": getattr(ln, "container_id", ""), + "log": ln.log, + } + for ln in lines + ] + + +@function_tool +async def nf_fetch_service_logs( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + log_type: Literal["runtime", "build"] = "runtime", + line_limit: int = 200, + duration_seconds: int = 600, + direction: Literal["backward", "forward"] = "backward", + text_includes: str | None = None, + project_id: str | None = None, + team_id: str | None = None, +) -> list[dict[str, Any]]: + """Fetch a bounded range of service logs over plain HTTP. + + Args: + log_type: ``runtime`` for app logs, ``build`` for build logs. + line_limit: Max number of log lines to return (defaults to 200). + duration_seconds: How far back to look from ``now`` (when direction is backward). + direction: ``backward`` searches from now into the past; ``forward`` is the inverse. + text_includes: Optional substring filter applied server-side. + """ + pid = resolve_project_id(ctx, project_id) + tid = resolve_team_id(ctx, team_id) + # The SDK exposes ``fetch_service_logs`` as a sync method; offload it so + # we don't block the event loop. + lines = await asyncio.to_thread( + ctx.context.client.logs.fetch_service_logs, + project_id=pid, + service_id=service_id, + team_id=tid, + log_type=log_type, + line_limit=line_limit, + duration_seconds=duration_seconds, + direction=direction, + text_includes=text_includes, + ) + return _log_lines_to_list(lines) + + +@function_tool +async def nf_tail_service_logs( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + max_lines: int = 100, + timeout_s: float = 10.0, + project_id: str | None = None, + team_id: str | None = None, +) -> list[dict[str, Any]]: + """Tail live service logs for a bounded duration / line count. + + Useful when ``fetch_service_logs`` is empty because the service has only + just started emitting output. + """ + pid = resolve_project_id(ctx, project_id) + tid = resolve_team_id(ctx, team_id) + collected: list[dict[str, Any]] = [] + + async def _run() -> None: + tail = await ctx.context.client.logs.atail_service_logs( + project_id=pid, service_id=service_id, team_id=tid + ) + async with tail: + async for ln in tail: + collected.append( + { + "ts": getattr(ln, "ts", ""), + "container_id": getattr(ln, "container_id", ""), + "log": ln.log, + } + ) + if len(collected) >= max_lines: + break + + try: + await asyncio.wait_for(_run(), timeout=timeout_s) + except asyncio.TimeoutError: + pass + return collected + + +# --------------------------------------------------------------------------- +# Metrics tools +# --------------------------------------------------------------------------- + + +MetricType = Literal[ + "cpu", + "memory", + "networkIngress", + "networkEgress", + "tcpConnectionsOpen", + "diskUsage", + "requests", + "http4xxResponses", + "http5xxResponses", + "bandwidth", + "bandwidthVolume", +] + + +@function_tool +async def nf_get_service_metrics( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + metric_types: list[MetricType] | None = None, + query_type: Literal["range", "single"] = "range", + duration: int | None = 300, + start_time: str | None = None, + end_time: str | None = None, + container_name: str | None = None, + deployment_id: str | None = None, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Fetch service metrics from the Northflank monitoring API. + + Args: + metric_types: Which metrics to fetch (default: cpu and memory). + query_type: ``range`` for a time series, ``single`` for one timestamp. + duration: For range queries, lookback window in seconds. + start_time / end_time: ISO-8601 timestamps overriding ``duration``. + container_name: Restrict to a single sidecar. + deployment_id: Restrict to a specific deployment. + """ + response = await ctx.context.client.get.service.metrics( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + team_id=resolve_team_id(ctx, team_id), + metric_types=metric_types or ["cpu", "memory"], + query_type=query_type, + duration=duration, + start_time=start_time, + end_time=end_time, + container_name=container_name, + deployment_id=deployment_id, + ) + return unwrap(response) + + +# --------------------------------------------------------------------------- +# Gated mutations (opt-in via northflank_tools(include_delete=True, ...)) +# --------------------------------------------------------------------------- + + +@function_tool(needs_approval=True) +async def nf_delete_service( + ctx: RunContextWrapper[NorthflankCtx], + service_id: str, + delete_child_objects: bool = False, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Delete a service. Pass delete_child_objects=True to remove dependent + resources (volumes, secrets) as well.""" + response = await ctx.context.client.delete.service( + project_id=resolve_project_id(ctx, project_id), + service_id=service_id, + team_id=resolve_team_id(ctx, team_id), + delete_child_objects=delete_child_objects, + ) + return unwrap(response) + + +@function_tool(needs_approval=True, strict_mode=False) +async def nf_put_secret( + ctx: RunContextWrapper[NorthflankCtx], + data: dict[str, Any], + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Create or update a project-scoped secret. + + The ``data`` payload is a Northflank ``CreateSecretRequest``: + + - ``name`` (required): secret group name within the project. + - ``priority`` (required): integer used to resolve precedence when + multiple secrets define the same key (higher wins). + - ``secretType`` (required): one of ``"environment-arguments"``, + ``"environment"``, ``"arguments"``. + - ``secrets`` (optional): the actual values, with sub-keys ``variables`` + (env vars), ``files`` (mounted files), and ``dockerSecretMounts``. + - Additional optional fields: ``description``, ``restrictions``, + ``stageId``, ``tags``, ``type`` (``"secret"`` | ``"config"``), + ``addonDependencies``, ``externalAddonDependencies``. + + Minimum example:: + + { + "name": "api-keys", + "priority": 10, + "secretType": "environment", + "secrets": {"variables": {"OPENAI_API_KEY": "sk-..."}}, + } + """ + response = await ctx.context.client.put.secret( + project_id=resolve_project_id(ctx, project_id), + team_id=resolve_team_id(ctx, team_id), + data=data, + ) + return unwrap(response) + + +@function_tool(needs_approval=True) +async def nf_delete_secret( + ctx: RunContextWrapper[NorthflankCtx], + secret_id: str, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Delete a project-scoped secret.""" + response = await ctx.context.client.delete.secret( + project_id=resolve_project_id(ctx, project_id), + secret_id=secret_id, + team_id=resolve_team_id(ctx, team_id), + ) + return unwrap(response) + + +@function_tool(needs_approval=True) +async def nf_delete_volume( + ctx: RunContextWrapper[NorthflankCtx], + volume_id: str, + project_id: str | None = None, + team_id: str | None = None, +) -> dict[str, Any]: + """Delete a persistent volume.""" + response = await ctx.context.client.delete.volume( + project_id=resolve_project_id(ctx, project_id), + volume_id=volume_id, + team_id=resolve_team_id(ctx, team_id), + ) + return unwrap(response) + + +@function_tool(needs_approval=True) +async def nf_delete_domain( + ctx: RunContextWrapper[NorthflankCtx], + domain: str, + team_id: str | None = None, +) -> dict[str, Any]: + """Delete a custom domain registration.""" + response = await ctx.context.client.delete.domain( + domain=domain, + team_id=resolve_team_id(ctx, team_id), + ) + return unwrap(response) + + +# --------------------------------------------------------------------------- +# Public namespace factory +# --------------------------------------------------------------------------- + + +_READ_TOOLS: list[FunctionTool] = [ + nf_list_projects, + nf_get_project, + nf_list_services, + nf_get_service, +] + +_DEPLOY_TOOLS: list[FunctionTool] = [ + nf_create_deployment_service, + nf_restart_service, + nf_pause_service, + nf_resume_service, + nf_scale_service, + nf_wait_service_ready, +] + +_EXEC_TOOLS: list[FunctionTool] = [ + nf_run_service_command, + nf_run_job_command, + nf_run_addon_command, +] + +_LOG_TOOLS: list[FunctionTool] = [nf_fetch_service_logs, nf_tail_service_logs] +_METRICS_TOOLS: list[FunctionTool] = [nf_get_service_metrics] + + +def northflank_tools( + *, + include_delete: bool = False, + include_secrets: bool = False, + include_volumes: bool = False, + include_domains: bool = False, + namespace: str = "northflank", + description: str = ( + "Read, deploy, scale, exec, log, and inspect Northflank services. " + "Mutating actions require approval." + ), +) -> list[FunctionTool]: + """Return the curated Northflank tool list grouped under a tool namespace. + + Gated mutating tools (delete, secrets, volumes, domains) are off by + default — opt-in per category to keep the model's tool surface small. + """ + tools: list[FunctionTool] = [] + tools.extend(_READ_TOOLS) + tools.extend(_DEPLOY_TOOLS) + tools.extend(_EXEC_TOOLS) + tools.extend(_LOG_TOOLS) + tools.extend(_METRICS_TOOLS) + if include_delete: + tools.append(nf_delete_service) + if include_secrets: + tools.extend([nf_put_secret, nf_delete_secret]) + if include_volumes: + tools.append(nf_delete_volume) + if include_domains: + tools.append(nf_delete_domain) + + return tool_namespace(name=namespace, description=description, tools=tools) + + +__all__ = [ + "northflank_tools", + # Read tools + "nf_list_projects", + "nf_get_project", + "nf_list_services", + "nf_get_service", + # Deploy / lifecycle + "nf_create_deployment_service", + "nf_restart_service", + "nf_pause_service", + "nf_resume_service", + "nf_scale_service", + "nf_wait_service_ready", + # Exec + "nf_run_service_command", + "nf_run_job_command", + "nf_run_addon_command", + # Logs + "nf_fetch_service_logs", + "nf_tail_service_logs", + # Metrics + "nf_get_service_metrics", + # Gated + "nf_delete_service", + "nf_put_secret", + "nf_delete_secret", + "nf_delete_volume", + "nf_delete_domain", +] diff --git a/src/agents/extensions/sandbox/__init__.py b/src/agents/extensions/sandbox/__init__.py index d7b082ba1f..29e574c714 100644 --- a/src/agents/extensions/sandbox/__init__.py +++ b/src/agents/extensions/sandbox/__init__.py @@ -109,6 +109,18 @@ except Exception: # pragma: no cover _HAS_VERCEL = False +try: + from .northflank import ( + NorthflankSandboxClient as NorthflankSandboxClient, + NorthflankSandboxClientOptions as NorthflankSandboxClientOptions, + NorthflankSandboxSession as NorthflankSandboxSession, + NorthflankSandboxSessionState as NorthflankSandboxSessionState, + ) + + _HAS_NORTHFLANK = True +except Exception: # pragma: no cover + _HAS_NORTHFLANK = False + __all__: list[str] = [] if _HAS_E2B: @@ -207,3 +219,13 @@ "RunloopUserParameters", ] ) + +if _HAS_NORTHFLANK: + __all__.extend( + [ + "NorthflankSandboxClient", + "NorthflankSandboxClientOptions", + "NorthflankSandboxSession", + "NorthflankSandboxSessionState", + ] + ) diff --git a/src/agents/extensions/sandbox/northflank/__init__.py b/src/agents/extensions/sandbox/northflank/__init__.py new file mode 100644 index 0000000000..0fa47b41f7 --- /dev/null +++ b/src/agents/extensions/sandbox/northflank/__init__.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from .sandbox import ( + NorthflankSandboxClient, + NorthflankSandboxClientOptions, + NorthflankSandboxSession, + NorthflankSandboxSessionState, +) + +__all__ = [ + "NorthflankSandboxClient", + "NorthflankSandboxClientOptions", + "NorthflankSandboxSession", + "NorthflankSandboxSessionState", +] diff --git a/src/agents/extensions/sandbox/northflank/sandbox.py b/src/agents/extensions/sandbox/northflank/sandbox.py new file mode 100644 index 0000000000..a0b5bdaa54 --- /dev/null +++ b/src/agents/extensions/sandbox/northflank/sandbox.py @@ -0,0 +1,1221 @@ +"""Northflank-backed sandbox client + session for the openai-agents SDK. + +The sandbox runs *inside* a Northflank service container. Two modes: + +* **Attach** to an existing service (``service_id`` given). The client never + deletes the service on cleanup — it only owns the exec/file connections. +* **Ephemeral** deployment (``image_path`` given, ``service_id`` omitted). + The client creates a deployment service on ``create()`` and deletes it on + ``delete()``. ``owned_by_client`` is recorded in the session state so a + resumed session can recover the same ownership decision. + +Implementation notes: + +* All command execution goes through the Northflank V1 exec WebSocket via + ``client.exec.arun_service_command``. The SDK's ``shell`` field is + forwarded verbatim to the exec proxy and only ``"none"`` is meaningful + for direct argv invocation, so commands always run as argv lists with + ``shell="none"``. +* File IO uses ``client.files.aupload`` / ``adownload`` because the exec + channel decodes stdout as text. Binary payloads (read, write, tar) round + trip through ``/tmp`` in the container plus a local tempfile; the staging + files are cleaned up best-effort. +""" + +from __future__ import annotations + +import base64 +import io +import shlex +import tarfile +import tempfile +import uuid +from collections.abc import Awaitable, Callable +from pathlib import Path, PurePosixPath +from typing import Any, Literal + +from ....sandbox.errors import ( + ExecTimeoutError, + ExecTransportError, + WorkspaceArchiveReadError, + WorkspaceArchiveWriteError, + WorkspaceReadNotFoundError, +) +from ....sandbox.manifest import Manifest +from ....sandbox.session import ( + BaseSandboxSession, + SandboxSession, + SandboxSessionState, +) +from ....sandbox.session.manager import Instrumentation +from ....sandbox.session.runtime_helpers import ( + RESOLVE_WORKSPACE_PATH_HELPER, + RuntimeHelperScript, +) +from ....sandbox.session.sandbox_client import ( + BaseSandboxClient, + BaseSandboxClientOptions, +) +from ....sandbox.session.tar_workspace import shell_tar_exclude_args +from ....sandbox.snapshot import SnapshotBase, SnapshotSpec, resolve_snapshot +from ....sandbox.types import ExecResult, User + +try: + from northflank import ApiCallError, AsyncApiClient +except ImportError as exc: # pragma: no cover - import path depends on optional extras + raise ImportError( + "Northflank sandbox support requires the optional `northflank` extra.\n" + "Install it with: pip install 'openai-agents[northflank]'" + ) from exc + + +__all__ = [ + "NorthflankSandboxClient", + "NorthflankSandboxClientOptions", + "NorthflankSandboxSession", + "NorthflankSandboxSessionState", +] + + +# --------------------------------------------------------------------------- +# Options + state +# --------------------------------------------------------------------------- + + +class NorthflankSandboxClientOptions(BaseSandboxClientOptions): + """Configuration for :class:`NorthflankSandboxClient`. + + Provide either ``service_id`` (attach to an existing service) or + ``image_path`` (create an ephemeral deployment service that the client + will delete on cleanup). Providing both is an error; providing neither + is also an error. + """ + + type: Literal["northflank"] = "northflank" + project_id: str + service_id: str | None = None + team_id: str | None = None + image_path: str | None = None + image_credentials: str | None = None + deployment_plan: str = "nf-compute-20" + instances: int = 1 + instance_name: str | None = None + container_name: str | None = None + wait_for_ready: bool = True + wait_timeout_s: float = 300.0 + exec_timeout_s: float = 60.0 + service_name_prefix: str = "sandbox-" + + # Container CMD/ENTRYPOINT overrides for ephemeral deployments. Stock + # base images such as ``ubuntu:24.04`` exit immediately because their + # default CMD finishes; supply ``docker_command="sleep infinity"`` (or a + # similar long-lived command) so the service stays up and exec calls + # can attach. Ignored when ``service_id`` is set. + docker_entrypoint: str | None = None + docker_command: str | None = None + + # Workspace persistence strategy. + # + # * ``None`` (default): the workspace lives in the container filesystem. + # Deleting the service drops the workspace; resuming requires the + # service to still exist with its original storage. + # * ``"volume"``: the client provisions a Northflank volume mounted at + # ``manifest.root`` and attaches it to the service. The volume + # survives service pause/resume; ``delete()`` removes it if the + # client created it. + # * ``"tar"``: at ``stop()`` time the workspace is tarred via exec, + # pulled down through ``files.adownload``, and embedded into the + # session state. On ``resume()`` (or any subsequent ``start()``) + # the tar is uploaded and extracted into ``manifest.root``. + workspace_persistence: Literal["volume", "tar"] | None = None + + # Volume spec passed to ``client.create.volume()``. Forwarded into the + # ``spec`` block verbatim, so callers can set + # ``storageSize`` / ``accessMode`` / ``storageClassName``. Ignored + # unless ``workspace_persistence == "volume"``. + volume_spec: dict[str, Any] | None = None + + # Attach a pre-existing caller-owned volume instead of creating one. + # The volume's mount path (set when the caller created the volume) + # must already cover ``manifest.root`` — Northflank's attach API + # does not accept a mount path override. The provider records + # ``owned_volume=False`` so ``delete()`` detaches but never deletes + # the volume. Mutually exclusive with ``volume_spec``. + volume_id: str | None = None + + +class NorthflankSandboxSessionState(SandboxSessionState): + """Serializable state for a Northflank-backed session. + + ``owned_by_client`` records whether the client created the service and + is therefore allowed to delete it on cleanup. Resumed sessions inherit + this flag from the persisted payload. + + Persistence-mode bookkeeping: + + * ``workspace_persistence`` records which strategy ``create()`` picked + so resumed sessions keep the same behaviour. + * ``volume_id`` / ``owned_volume`` (volume mode) track the workspace + volume the client provisioned and whether ``delete()`` should remove + it on cleanup. + * ``persisted_workspace_tar_b64`` (tar mode) holds the base64-encoded + workspace tar captured at ``stop()``; on resume the session + uploads and extracts it back into ``manifest.root``. + """ + + type: Literal["northflank"] = "northflank" + project_id: str + service_id: str + team_id: str | None = None + instance_name: str | None = None + container_name: str | None = None + owned_by_client: bool = False + exec_timeout_s: float = 60.0 + workspace_persistence: Literal["volume", "tar"] | None = None + volume_id: str | None = None + owned_volume: bool = False + persisted_workspace_tar_b64: str | None = None + + +# --------------------------------------------------------------------------- +# Module-private helpers +# --------------------------------------------------------------------------- + + +def _build_docker_block(*, entrypoint: str | None, command: str | None) -> dict[str, Any] | None: + """Build the ``deployment.docker`` block from optional ENTRYPOINT/CMD overrides. + + Matches Northflank's ``CreateJobResponseDeploymentDocker`` shape: + ``configType`` is derived from which overrides are supplied. + """ + if entrypoint is None and command is None: + return None + block: dict[str, Any] = {} + if entrypoint is not None and command is not None: + block["configType"] = "customEntrypointCustomCommand" + block["customEntrypoint"] = entrypoint + block["customCommand"] = command + elif entrypoint is not None: + block["configType"] = "customEntrypoint" + block["customEntrypoint"] = entrypoint + else: + block["configType"] = "customCommand" + block["customCommand"] = command + return block + + +def _ensure_bytes(value: Any) -> bytes: + if isinstance(value, bytes): + return value + if isinstance(value, str): + return value.encode("utf-8", errors="replace") + return bytes(value) + + +def _link_target_stays_inside_archive(member_name: str, link_target: str) -> bool: + """Return True if ``link_target`` resolves inside the archive root. + + ``member_name`` is the entry's path (workspace-relative; never absolute + by the time we get here). ``link_target`` is its symlink/hardlink + target. The check rejects absolute targets and targets that walk above + the archive root via ``..``. + """ + if not link_target or link_target.startswith("/"): + return False + member_dir = PurePosixPath(member_name).parent + resolved_parts: list[str] = [] + for component in (member_dir / link_target).as_posix().split("/"): + if component in ("", "."): + continue + if component == "..": + if not resolved_parts: + return False + resolved_parts.pop() + continue + resolved_parts.append(component) + return True + + +def _validate_tar_payload(payload: bytes, *, target_root: Path) -> None: + """Reject tar archives that would escape ``target_root`` on extract. + + The hydration flow uploads the tar to ``/tmp`` inside the container and + then runs ``tar -xf`` against the workspace root. We can't easily + inject Python's ``filter='data'`` extraction policy across that + boundary, so the archive is inspected locally before being shipped. + + Rules mirror ``filter='data'``: + + * No absolute member names; no ``..`` traversal in names. + * Regular files and directories pass through unchanged. + * Symlinks and hardlinks are allowed only when the link target is + relative and resolves inside the archive root. + * Other entry types (devices, FIFOs, etc.) are rejected outright. + """ + + try: + tf = tarfile.open(fileobj=io.BytesIO(payload), mode="r:*") + except tarfile.TarError as exc: + raise WorkspaceArchiveWriteError( + path=target_root, + context={"reason": "invalid_tar"}, + cause=exc, + ) from exc + + try: + try: + members = tf.getmembers() + except (tarfile.TarError, OSError, EOFError) as exc: + # Truncated archives or malformed metadata can surface here + # rather than at ``open()``; treat any failure walking the + # member list as an unsafe archive. + raise WorkspaceArchiveWriteError( + path=target_root, + context={"reason": "tar_read_failed"}, + cause=exc, + ) from exc + + for member in members: + name = member.name + if not name or name.startswith("/"): + raise WorkspaceArchiveWriteError( + path=target_root, + context={"reason": "absolute_member", "member": name}, + ) + parts = PurePosixPath(name).parts + if any(part == ".." for part in parts): + raise WorkspaceArchiveWriteError( + path=target_root, + context={"reason": "traversal_member", "member": name}, + ) + if member.isfile() or member.isdir(): + continue + if member.issym() or member.islnk(): + if not _link_target_stays_inside_archive(name, member.linkname or ""): + raise WorkspaceArchiveWriteError( + path=target_root, + context={ + "reason": "unsafe_link_target", + "member": name, + "link_target": member.linkname or "", + }, + ) + continue + raise WorkspaceArchiveWriteError( + path=target_root, + context={ + "reason": "unsupported_member_type", + "member": name, + "type": str(member.type), + }, + ) + finally: + try: + tf.close() + except (tarfile.TarError, OSError): + # ``close`` can raise on malformed archives; the validation + # outcome has already been decided so swallow. + pass + + +# --------------------------------------------------------------------------- +# Session +# --------------------------------------------------------------------------- + + +class NorthflankSandboxSession(BaseSandboxSession): + """Sandbox session that runs in a Northflank service container.""" + + state: NorthflankSandboxSessionState + _client: AsyncApiClient + _running: bool + + def __init__( + self, + *, + state: NorthflankSandboxSessionState, + client: AsyncApiClient, + ) -> None: + self.state = state + self._client = client + self._running = True + + @classmethod + def from_state( + cls, + state: NorthflankSandboxSessionState, + *, + client: AsyncApiClient, + ) -> NorthflankSandboxSession: + return cls(state=state, client=client) + + # -- lifecycle hooks -------------------------------------------------- + + async def _prepare_backend_workspace(self) -> None: + # Make sure manifest.root exists inside the container before the + # base class tries to materialize the manifest there. + root = self.state.manifest.root + result = await self._client.exec.arun_service_command( + project_id=self.state.project_id, + service_id=self.state.service_id, + team_id=self.state.team_id, + command=["mkdir", "-p", root], + shell="none", + instance_name=self.state.instance_name, + container_name=self.state.container_name, + timeout=self.state.exec_timeout_s, + ) + if result.exit_code not in (0, None): + raise WorkspaceArchiveWriteError( + path=Path(root), + context={ + "stage": "prepare_backend_workspace", + "stdout": result.stdout, + "stderr": result.stderr, + }, + ) + + # Tar-mode resume: replay the captured workspace tar before the + # base session populates the manifest. Skip when there is no + # captured payload — on first start there is nothing to restore. + if self.state.workspace_persistence == "tar" and self.state.persisted_workspace_tar_b64: + payload = base64.b64decode(self.state.persisted_workspace_tar_b64) + await self.hydrate_workspace(io.BytesIO(payload)) + + async def _after_start(self) -> None: + self._running = True + + async def _persist_snapshot(self) -> None: + # Tar mode: capture the workspace tar into session state before + # the base lifecycle (which also drives any user-supplied + # snapshot framework) runs. Stored as base64 so the state stays + # JSON-serialisable. + if self.state.workspace_persistence == "tar": + archive = await self.persist_workspace() + try: + payload = archive.read() + finally: + try: + archive.close() + except Exception: + pass + if isinstance(payload, str): + payload = payload.encode("utf-8") + self.state.persisted_workspace_tar_b64 = base64.b64encode(payload).decode("ascii") + + await super()._persist_snapshot() + + async def _shutdown_backend(self) -> None: + # No persistent exec channels to tear down — each command is a + # one-shot WebSocket. Mark the session not-running so future + # ``running()`` calls reflect cleanup. + self._running = False + + # -- runtime helpers + remote path validation ----------------------- + + def _runtime_helpers(self) -> tuple[RuntimeHelperScript, ...]: + # ``_validate_remote_path_access`` resolves symlinks and ``..`` + # components against the workspace root using a small shell helper + # installed inside the container. + return (RESOLVE_WORKSPACE_PATH_HELPER,) + + def _current_runtime_helper_cache_key(self) -> object | None: + # Invalidate the helper-installed cache whenever we point at a + # different Northflank service. + return self.state.service_id + + async def _validate_path_access(self, path: Path | str, *, for_write: bool = False) -> Path: + return await self._validate_remote_path_access(path, for_write=for_write) + + # -- exec ------------------------------------------------------------- + + async def _exec_internal( + self, + *command: str | Path, + timeout: float | None = None, + ) -> ExecResult: + argv = [str(c) for c in command] + try: + result = await self._client.exec.arun_service_command( + project_id=self.state.project_id, + service_id=self.state.service_id, + team_id=self.state.team_id, + command=argv, + shell="none", + instance_name=self.state.instance_name, + container_name=self.state.container_name, + timeout=timeout or self.state.exec_timeout_s, + ) + except TimeoutError as exc: + raise ExecTimeoutError(command=command, timeout_s=timeout, cause=exc) from exc + except Exception as exc: + raise ExecTransportError(command=command, cause=exc) from exc + + return ExecResult( + stdout=_ensure_bytes(result.stdout), + stderr=_ensure_bytes(result.stderr), + exit_code=result.exit_code if result.exit_code is not None else 0, + ) + + # -- file IO ---------------------------------------------------------- + + async def read(self, path: Path, *, user: str | User | None = None) -> io.IOBase: + """Read ``path`` via Northflank's file-copy API. + + Per-user file operations are not supported: Northflank's + ``files.adownload`` does not expose a per-call user override and + runs as whatever identity the exec proxy decided for the container + (typically the image's default WORKDIR user, often root). Passing + a non-None ``user`` raises :class:`NotImplementedError` so the + mismatch is loud — fall back to ``session.exec(..., user=...)`` + when user-scoped reads are required. + """ + if user is not None: + raise NotImplementedError( + "Northflank sandbox does not support per-user file operations; " + "use session.exec(..., user=...) instead." + ) + workspace_path = await self._validate_path_access(path) + remote = str(workspace_path) + basename = PurePosixPath(remote).name + # The Northflank SDK's ``_extract_download_tar`` treats local_path + # as a *directory* when it has no file extension and as a *file* + # otherwise. Always pass a temp directory and look the extracted + # file up by basename so both extension-less and extension-bearing + # remote names behave the same. + with tempfile.TemporaryDirectory() as tmpdir: + try: + await self._client.files.adownload( + project_id=self.state.project_id, + remote_path=remote, + local_path=tmpdir, + service_id=self.state.service_id, + team_id=self.state.team_id, + instance_name=self.state.instance_name, + container_name=self.state.container_name, + ) + except Exception as exc: + raise WorkspaceReadNotFoundError(path=workspace_path, cause=exc) from exc + extracted = Path(tmpdir) / basename + if not extracted.is_file(): + raise WorkspaceReadNotFoundError( + path=workspace_path, + context={"reason": "extracted_missing", "basename": basename}, + ) + data = extracted.read_bytes() + return io.BytesIO(data) + + async def write( + self, + path: Path, + data: io.IOBase, + *, + user: str | User | None = None, + ) -> None: + """Write ``data`` to ``path`` via Northflank's file-copy API. + + Per-user file operations are not supported (see :meth:`read` for + the rationale). A non-None ``user`` raises + :class:`NotImplementedError`; use ``session.exec(..., user=...)`` + when the write must happen as a specific Unix user. + """ + if user is not None: + raise NotImplementedError( + "Northflank sandbox does not support per-user file operations; " + "use session.exec(..., user=...) instead." + ) + workspace_path = await self._validate_path_access(path, for_write=True) + remote = PurePosixPath(str(workspace_path)) + parent = str(remote.parent) + basename = remote.name + + # Make sure the parent directory exists in the container. + mkdir = await self._client.exec.arun_service_command( + project_id=self.state.project_id, + service_id=self.state.service_id, + team_id=self.state.team_id, + command=["mkdir", "-p", parent], + shell="none", + instance_name=self.state.instance_name, + container_name=self.state.container_name, + timeout=self.state.exec_timeout_s, + ) + if mkdir.exit_code not in (0, None): + raise WorkspaceArchiveWriteError( + path=Path(str(workspace_path)), + context={"stage": "mkdir_parent", "stderr": mkdir.stderr}, + ) + + payload = data.read() + if isinstance(payload, str): + payload = payload.encode("utf-8") + + # ``_upload_target`` in the Northflank SDK uses the remote_path's + # suffix to decide whether to rename the uploaded file. Extension- + # less targets like ``/workspace/Makefile`` get treated as + # directories and the local basename ends up inside them. Sidestep + # the heuristic by uploading a *staging directory* whose only entry + # has the correct basename, with ``remote_path`` pointing at the + # parent. The SDK then tars the directory contents into + # ``remote_path/`` verbatim, producing ``/`` + # regardless of suffix. + with tempfile.TemporaryDirectory() as tmpdir: + staging = Path(tmpdir) / "staging" + staging.mkdir() + (staging / basename).write_bytes(payload) + try: + await self._client.files.aupload( + project_id=self.state.project_id, + local_path=str(staging), + remote_path=parent, + service_id=self.state.service_id, + team_id=self.state.team_id, + instance_name=self.state.instance_name, + container_name=self.state.container_name, + ) + except Exception as exc: + raise WorkspaceArchiveWriteError(path=Path(str(workspace_path)), cause=exc) from exc + + # -- workspace tar round-trip ---------------------------------------- + + async def persist_workspace(self) -> io.IOBase: + root = self.state.manifest.root + archive_basename = f"nf-sandbox-{uuid.uuid4().hex}.tar" + remote_tar = f"/tmp/{archive_basename}" + + # Honour manifest ephemeral / runtime skip paths via tar excludes. + exclude_args = shell_tar_exclude_args(self._persist_workspace_skip_relpaths()) + tar_script = " ".join( + [ + "tar", + *exclude_args, + "-C", + shlex.quote(root), + "-cf", + shlex.quote(remote_tar), + ".", + ] + ) + tar_cmd = ["sh", "-lc", tar_script] + tar_result = await self._client.exec.arun_service_command( + project_id=self.state.project_id, + service_id=self.state.service_id, + team_id=self.state.team_id, + command=tar_cmd, + shell="none", + instance_name=self.state.instance_name, + container_name=self.state.container_name, + timeout=self.state.exec_timeout_s, + ) + if tar_result.exit_code not in (0, None): + raise WorkspaceArchiveReadError( + path=Path(root), + context={"stage": "tar", "stderr": tar_result.stderr}, + ) + + # ``adownload`` treats local_path with no suffix as a directory and + # extracts the tar's contents into it. Pass a temp directory and + # look up the archive by remote basename afterwards. + with tempfile.TemporaryDirectory() as tmpdir: + try: + try: + await self._client.files.adownload( + project_id=self.state.project_id, + remote_path=remote_tar, + local_path=tmpdir, + service_id=self.state.service_id, + team_id=self.state.team_id, + instance_name=self.state.instance_name, + container_name=self.state.container_name, + ) + except Exception as exc: + raise WorkspaceArchiveReadError(path=Path(root), cause=exc) from exc + extracted = Path(tmpdir) / archive_basename + if not extracted.is_file(): + raise WorkspaceArchiveReadError( + path=Path(root), + context={ + "reason": "archive_missing", + "basename": archive_basename, + }, + ) + data = extracted.read_bytes() + finally: + # Best-effort cleanup of the in-container staging archive. + try: + await self._client.exec.arun_service_command( + project_id=self.state.project_id, + service_id=self.state.service_id, + team_id=self.state.team_id, + command=["rm", "-f", remote_tar], + shell="none", + instance_name=self.state.instance_name, + container_name=self.state.container_name, + timeout=self.state.exec_timeout_s, + ) + except Exception: + pass + + return io.BytesIO(data) + + async def hydrate_workspace(self, data: io.IOBase) -> None: + root = self.state.manifest.root + archive_basename = f"nf-sandbox-{uuid.uuid4().hex}.tar" + remote_tar = f"/tmp/{archive_basename}" + + payload = data.read() + if isinstance(payload, str): + payload = payload.encode("utf-8") + + # We invoke ``tar -xf`` on the remote container so the SDK's + # ``filter='data'`` extraction policy does not apply. Inspect every + # member here before shipping the archive over the wire. + _validate_tar_payload(payload, target_root=Path(root)) + + # Stage the tar via a directory upload so ``remote_path``'s suffix + # heuristic does not rename it on the way in. + with tempfile.TemporaryDirectory() as tmpdir: + staging = Path(tmpdir) / "staging" + staging.mkdir() + (staging / archive_basename).write_bytes(payload) + try: + await self._client.files.aupload( + project_id=self.state.project_id, + local_path=str(staging), + remote_path="/tmp", + service_id=self.state.service_id, + team_id=self.state.team_id, + instance_name=self.state.instance_name, + container_name=self.state.container_name, + ) + except Exception as exc: + raise WorkspaceArchiveWriteError(path=Path(root), cause=exc) from exc + + try: + extract_cmd = [ + "sh", + "-lc", + f"mkdir -p {shlex.quote(root)} && " + f"tar -C {shlex.quote(root)} -xf {shlex.quote(remote_tar)}", + ] + extract_result = await self._client.exec.arun_service_command( + project_id=self.state.project_id, + service_id=self.state.service_id, + team_id=self.state.team_id, + command=extract_cmd, + shell="none", + instance_name=self.state.instance_name, + container_name=self.state.container_name, + timeout=self.state.exec_timeout_s, + ) + if extract_result.exit_code not in (0, None): + raise WorkspaceArchiveWriteError( + path=Path(root), + context={ + "stage": "tar_extract", + "stderr": extract_result.stderr, + }, + ) + finally: + try: + await self._client.exec.arun_service_command( + project_id=self.state.project_id, + service_id=self.state.service_id, + team_id=self.state.team_id, + command=["rm", "-f", remote_tar], + shell="none", + instance_name=self.state.instance_name, + container_name=self.state.container_name, + timeout=self.state.exec_timeout_s, + ) + except Exception: + pass + + # -- status ----------------------------------------------------------- + + async def running(self) -> bool: + if not self._running: + return False + try: + response = await self._client.get.service( + project_id=self.state.project_id, + service_id=self.state.service_id, + team_id=self.state.team_id, + ) + except Exception: + return False + data = getattr(response, "data", response) or {} + if data.get("servicePaused"): + return False + deployment_status = ((data.get("status") or {}).get("deployment") or {}).get("status") + return deployment_status == "COMPLETED" + + +# --------------------------------------------------------------------------- +# Client +# --------------------------------------------------------------------------- + + +class NorthflankSandboxClient(BaseSandboxClient[NorthflankSandboxClientOptions]): + """Sandbox client backed by a Northflank service container. + + Pass a long-lived :class:`northflank.AsyncApiClient` at construction. + The client manages service-resource lifecycle (create / attach / + delete); the SDK client itself is not owned and must be closed by the + caller. + """ + + backend_id = "northflank" + supports_default_options = False + + def __init__( + self, + *, + client: AsyncApiClient, + instrumentation: Instrumentation | None = None, + ) -> None: + self._client = client + self._instrumentation = instrumentation or Instrumentation() + # ``BaseSandboxClient`` consults ``_dependencies`` via + # ``_resolve_dependencies()``; default to None. + self._dependencies = None + + # -- helpers --------------------------------------------------------- + + async def _create_ephemeral_service( + self, + options: NorthflankSandboxClientOptions, + *, + wait: bool = True, + ) -> str: + if not options.image_path: + raise ValueError( + "NorthflankSandboxClientOptions: either service_id or image_path must be set." + ) + service_name = f"{options.service_name_prefix}{uuid.uuid4().hex[:12]}" + deployment: dict[str, Any] = { + "instances": options.instances, + "external": {"imagePath": options.image_path}, + } + if options.image_credentials: + deployment["external"]["credentials"] = options.image_credentials + + docker_block = _build_docker_block( + entrypoint=options.docker_entrypoint, + command=options.docker_command, + ) + if docker_block is not None: + deployment["docker"] = docker_block + + payload = { + "name": service_name, + "billing": {"deploymentPlan": options.deployment_plan}, + "deployment": deployment, + } + response = await self._client.create.service.deployment( + project_id=options.project_id, + team_id=options.team_id, + data=payload, + ) + data = getattr(response, "data", response) or {} + service_id = data.get("id") or data.get("name") or service_name + if wait and options.wait_for_ready: + await self._wait_for_service_or_cleanup( + project_id=options.project_id, + service_id=service_id, + team_id=options.team_id, + timeout_s=options.wait_timeout_s, + ) + return service_id + + async def _wait_for_service_or_cleanup( + self, + *, + project_id: str, + service_id: str, + team_id: str | None, + timeout_s: float, + extra_cleanup: Callable[[], Awaitable[None]] | None = None, + ) -> None: + """Wait for the deployment to become ready; on failure delete the service. + + The deployment was already accepted by the API, so the service + exists on Northflank. Without a best-effort delete here a + readiness failure (timeout, FAILED deployment, ctrl-C) leaves + the service stranded with no SandboxSession to clean it up + later. ``extra_cleanup`` runs *after* the service is gone so + volume-mode can detach + delete the volume without fighting the + live runtime. + """ + try: + await self._client.helpers.wait_for_service_ready( + project_id=project_id, + service_id=service_id, + team_id=team_id, + timeout_s=timeout_s, + ) + except BaseException: + await self._best_effort_delete( + project_id=project_id, + service_id=service_id, + team_id=team_id, + ) + if extra_cleanup is not None: + try: + await extra_cleanup() + except Exception: + pass + raise + + async def _create_workspace_volume( + self, + *, + options: NorthflankSandboxClientOptions, + service_id: str, + root: str, + ) -> str: + """Provision a Northflank volume attached to ``service_id`` at ``root``. + + The volume is created with ``attachedObjects=[service]`` which both + creates the volume and configures the service to mount it at + ``root``. Returns the new volume id. + """ + # nf-multi-rw is a ReadWriteMany class; pair it with the matching + # access mode. 5120 MB is the smallest storageSize Northflank + # accepts on that class — smaller values are rejected at create. + spec = dict( + options.volume_spec + or { + "storageSize": 5120, + "accessMode": "ReadWriteMany", + "storageClassName": "nf-multi-rw", + } + ) + volume_name = f"{options.service_name_prefix}vol-{uuid.uuid4().hex[:8]}" + data: dict[str, Any] = { + "name": volume_name, + "mounts": [{"containerMountPath": root}], + "spec": spec, + "attachedObjects": [{"id": service_id, "type": "service"}], + } + response = await self._client.create.volume( + project_id=options.project_id, + team_id=options.team_id, + data=data, + ) + body = getattr(response, "data", response) or {} + volume_id = body.get("id") or body.get("name") or volume_name + return str(volume_id) + + async def _attach_workspace_volume( + self, + *, + options: NorthflankSandboxClientOptions, + service_id: str, + volume_id: str, + ) -> None: + """Attach a pre-existing caller-owned volume to ``service_id``. + + The volume's mount path is fixed at create time and not + overridable here, so the caller must have configured the volume + to mount at ``manifest.root``. A 409 from the API is treated as + "already attached" and tolerated so re-running create against + the same service is idempotent. + """ + try: + await self._client.attach.volume( + project_id=options.project_id, + volume_id=volume_id, + team_id=options.team_id, + data={"nfObject": {"id": service_id, "type": "service"}}, + ) + except ApiCallError as exc: + # 409 = already attached to this object. Anything else is a + # real failure (unknown volume, auth, etc.) and must surface. + if getattr(exc, "status", None) != 409: + raise + + async def _best_effort_delete( + self, + *, + project_id: str, + service_id: str, + team_id: str | None, + ) -> None: + try: + await self._client.delete.service( + project_id=project_id, + service_id=service_id, + team_id=team_id, + delete_child_objects=True, + ) + except Exception: + # Cleanup-path: never let a delete failure mask the original + # readiness error. The caller is already re-raising. + pass + + async def _best_effort_delete_volume( + self, + *, + project_id: str, + volume_id: str, + team_id: str | None, + attached_service_id: str | None = None, + ) -> None: + # Northflank tracks the attachment as volume metadata, so even + # after the service is gone the volume's ``attachedObjects`` may + # still reference it and reject delete. Detach first if we know + # the service id; tolerate every failure since this is a + # best-effort cleanup path. + if attached_service_id is not None: + try: + await self._client.detach.volume( + project_id=project_id, + volume_id=volume_id, + team_id=team_id, + data={"nfObject": {"id": attached_service_id, "type": "service"}}, + ) + except Exception: + pass + try: + await self._client.delete.volume( + project_id=project_id, + volume_id=volume_id, + team_id=team_id, + ) + except Exception: + pass + + async def _best_effort_detach_volume( + self, + *, + project_id: str, + volume_id: str, + team_id: str | None, + attached_service_id: str, + ) -> None: + """Detach ``volume_id`` from ``attached_service_id`` without deleting it. + + Used for caller-owned volumes on the cleanup path: we still + unwire the volume from our (now-defunct) service so the caller + can reattach or delete it themselves, but we never touch the + volume itself. + """ + try: + await self._client.detach.volume( + project_id=project_id, + volume_id=volume_id, + team_id=team_id, + data={"nfObject": {"id": attached_service_id, "type": "service"}}, + ) + except Exception: + pass + + # -- abstract methods ------------------------------------------------ + + async def create( + self, + *, + snapshot: SnapshotSpec | SnapshotBase | None = None, + manifest: Manifest | None = None, + options: NorthflankSandboxClientOptions, + ) -> SandboxSession: + if options.service_id and options.image_path: + raise ValueError( + "NorthflankSandboxClientOptions: pass either service_id or image_path, not both." + ) + if options.workspace_persistence == "volume": + if not options.image_path and not options.volume_id: + raise ValueError( + "NorthflankSandboxClientOptions: workspace_persistence='volume' requires " + "either image_path (client provisions a fresh service and volume) or " + "volume_id (attach a caller-owned volume to either a client-created " + "or pre-existing service)." + ) + if options.volume_id and options.volume_spec: + raise ValueError( + "NorthflankSandboxClientOptions: volume_spec is ignored when volume_id " + "is set — remove one of the two." + ) + elif options.volume_id: + raise ValueError( + "NorthflankSandboxClientOptions: volume_id requires workspace_persistence='volume'." + ) + + resolved_manifest = manifest or Manifest() + + volume_id: str | None = None + owned_volume = False + + if options.service_id: + service_id = options.service_id + owned_by_client = False + elif options.workspace_persistence == "volume": + # Create the service without waiting so we can attach the + # volume first; readiness is checked after the volume mount + # is in place. + service_id = await self._create_ephemeral_service(options, wait=False) + owned_by_client = True + else: + service_id = await self._create_ephemeral_service(options) + owned_by_client = True + + if options.workspace_persistence == "volume": + try: + if options.volume_id: + # Attach the caller's pre-existing volume. The volume + # is not provider-owned, so delete() will only detach + # it on cleanup. + await self._attach_workspace_volume( + options=options, + service_id=service_id, + volume_id=options.volume_id, + ) + volume_id = options.volume_id + owned_volume = False + else: + volume_id = await self._create_workspace_volume( + options=options, + service_id=service_id, + root=resolved_manifest.root, + ) + owned_volume = True + except BaseException: + if owned_by_client: + await self._best_effort_delete( + project_id=options.project_id, + service_id=service_id, + team_id=options.team_id, + ) + raise + + if owned_by_client and options.wait_for_ready: + resolved_volume_id = volume_id + cleanup_service_id = service_id + volume_is_owned = owned_volume + + async def _cleanup_volume() -> None: + if resolved_volume_id is None: + return + if volume_is_owned: + await self._best_effort_delete_volume( + project_id=options.project_id, + volume_id=resolved_volume_id, + team_id=options.team_id, + attached_service_id=cleanup_service_id, + ) + else: + # Caller-owned volume: detach only, never delete. + await self._best_effort_detach_volume( + project_id=options.project_id, + volume_id=resolved_volume_id, + team_id=options.team_id, + attached_service_id=cleanup_service_id, + ) + + await self._wait_for_service_or_cleanup( + project_id=options.project_id, + service_id=service_id, + team_id=options.team_id, + timeout_s=options.wait_timeout_s, + extra_cleanup=_cleanup_volume, + ) + + session_id = uuid.uuid4() + snapshot_instance = resolve_snapshot(snapshot, str(session_id)) + state = NorthflankSandboxSessionState( + session_id=session_id, + manifest=resolved_manifest, + snapshot=snapshot_instance, + project_id=options.project_id, + service_id=service_id, + team_id=options.team_id, + instance_name=options.instance_name, + container_name=options.container_name, + owned_by_client=owned_by_client, + exec_timeout_s=options.exec_timeout_s, + workspace_persistence=options.workspace_persistence, + volume_id=volume_id, + owned_volume=owned_volume, + ) + inner = NorthflankSandboxSession.from_state(state, client=self._client) + # Attach mode reuses a service that already exists, so its + # workspace is whatever the user left there — mark it preserved + # so the base session does not clear it on first start. + if options.service_id: + inner._set_start_state_preserved(True) + return self._wrap_session(inner, instrumentation=self._instrumentation) + + async def resume(self, state: SandboxSessionState) -> SandboxSession: + if not isinstance(state, NorthflankSandboxSessionState): + raise TypeError( + "NorthflankSandboxClient.resume expects a NorthflankSandboxSessionState" + ) + inner = NorthflankSandboxSession.from_state(state, client=self._client) + # Resume reattaches to a service that already exists, whose + # workspace either lives on a Northflank volume (volume mode), is + # restored from the tar embedded in state (tar mode), or simply + # carries over in-container (attach mode). In every case the base + # session must treat the workspace as preserved so it does not + # ``ls`` + clear the root — that path chokes on volume-mount + # setuid bits and would also drop the persisted content. + inner._set_start_state_preserved(True) + return self._wrap_session(inner, instrumentation=self._instrumentation) + + async def delete(self, session: SandboxSession) -> SandboxSession: + inner = session._inner + if not isinstance(inner, NorthflankSandboxSession): + raise TypeError("NorthflankSandboxClient.delete expects a NorthflankSandboxSession") + # Cleanup order: stop the runtime first, then unwire the volume. + # + # 1. Delete the service so nothing is still using the volume mount. + # 2. Detach any volume from the (now-defunct) service. Northflank + # does not auto-detach volumes when their attached service is + # deleted — the volume's ``attachedObjects`` still references + # the dead service id, which would block a subsequent delete + # and leave the caller's volume wired to a ghost service. + # 3. Delete the volume *only* if the client created it. A + # caller-supplied volume (owned_volume=False) is left in + # place: detach yes, delete never. + if inner.state.owned_by_client: + try: + await self._client.delete.service( + project_id=inner.state.project_id, + service_id=inner.state.service_id, + team_id=inner.state.team_id, + delete_child_objects=True, + ) + except ApiCallError as exc: + # Treat "already gone" as success. Surface every other API + # error (auth, server-side, etc.) so cleanup failures are + # visible — otherwise leaked services accumulate silently. + if getattr(exc, "status", None) != 404: + raise + if inner.state.volume_id: + try: + await self._client.detach.volume( + project_id=inner.state.project_id, + volume_id=inner.state.volume_id, + team_id=inner.state.team_id, + data={"nfObject": {"id": inner.state.service_id, "type": "service"}}, + ) + except ApiCallError as exc: + # 404 = volume already gone; 409 / 4xx with "not attached" + # = the attachment was already gone. Either way we want + # to fall through to the (optional) delete. + if getattr(exc, "status", None) not in (404, 409): + raise + if inner.state.owned_volume: + try: + await self._client.delete.volume( + project_id=inner.state.project_id, + volume_id=inner.state.volume_id, + team_id=inner.state.team_id, + ) + except ApiCallError as exc: + if getattr(exc, "status", None) != 404: + raise + return session + + def deserialize_session_state(self, payload: dict[str, object]) -> SandboxSessionState: + return NorthflankSandboxSessionState.model_validate(payload) diff --git a/tests/extensions/northflank/test_tools_shell.py b/tests/extensions/northflank/test_tools_shell.py new file mode 100644 index 0000000000..8eae858dbd --- /dev/null +++ b/tests/extensions/northflank/test_tools_shell.py @@ -0,0 +1,212 @@ +"""Smoke tests: imports, namespace registration, tool schemas.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from agents.extensions.northflank import ( + NorthflankCtx, + NorthflankShellExecutor, + nf_get_service, + nf_list_projects, + northflank_tools, +) +from agents.extensions.northflank._helpers import wrap_shell_command +from agents.tool import ( + FunctionTool, + ShellActionRequest, + ShellCallData, + ShellCommandRequest, + ShellResult, +) + + +def test_imports() -> None: + assert callable(northflank_tools) + assert isinstance(nf_list_projects, FunctionTool) + assert isinstance(nf_get_service, FunctionTool) + + +def test_default_namespace_has_no_gated_tools() -> None: + tools = northflank_tools() + names = {t.name for t in tools} + assert "nf_list_projects" in names + assert "nf_create_deployment_service" in names + assert "nf_run_service_command" in names + assert "nf_delete_service" not in names + assert "nf_put_secret" not in names + + +def test_namespace_metadata_applied() -> None: + tools = northflank_tools(namespace="ops", description="custom desc") + for tool in tools: + # tool_namespace() copies tools and attaches private metadata fields. + assert getattr(tool, "_tool_namespace", None) == "ops" + assert getattr(tool, "_tool_namespace_description", None) == "custom desc" + + +def test_gated_tools_optin() -> None: + base = {t.name for t in northflank_tools()} + full = { + t.name + for t in northflank_tools( + include_delete=True, + include_secrets=True, + include_volumes=True, + include_domains=True, + ) + } + extras = full - base + assert extras == { + "nf_delete_service", + "nf_put_secret", + "nf_delete_secret", + "nf_delete_volume", + "nf_delete_domain", + } + + +def test_mutating_tools_require_approval() -> None: + tools = {t.name: t for t in northflank_tools(include_delete=True, include_secrets=True)} + must_approve = { + "nf_create_deployment_service", + "nf_restart_service", + "nf_pause_service", + "nf_resume_service", + "nf_scale_service", + "nf_run_service_command", + "nf_run_job_command", + "nf_run_addon_command", + "nf_delete_service", + "nf_put_secret", + "nf_delete_secret", + } + for name in must_approve: + assert tools[name].needs_approval is True, f"{name} should require approval" + + +def test_read_tools_no_approval() -> None: + tools = {t.name: t for t in northflank_tools()} + for name in ("nf_list_projects", "nf_get_project", "nf_list_services", "nf_get_service"): + assert tools[name].needs_approval is False, f"{name} should not require approval" + + +def test_tools_emit_json_schema_without_ctx_param() -> None: + """Each tool exposes a JSON schema; the RunContextWrapper param is hidden.""" + for tool in northflank_tools(include_delete=True): + schema = tool.params_json_schema + assert isinstance(schema, dict) + properties = schema.get("properties", {}) + # ``ctx`` (RunContextWrapper) should not leak into the wire schema. + assert "ctx" not in properties, f"{tool.name} leaks ctx into its schema" + + +def test_shell_executor_construct() -> None: + executor = NorthflankShellExecutor(service_id="svc-xyz") + assert executor.service_id == "svc-xyz" + assert executor.shell == "sh" + assert callable(executor) + + +class _FakeExec: + def __init__(self, recorder: list[dict[str, Any]]) -> None: + self._recorder = recorder + + async def arun_service_command(self, **kwargs): + self._recorder.append(kwargs) + + class _Result: + exit_code = 0 + stdout = "hi\n" + stderr = "" + status = "completed" + + return _Result() + + +class _FakeClient: + def __init__(self, recorder: list[dict[str, Any]]) -> None: + self.exec = _FakeExec(recorder) + + +class _FakeRunCtx: + def __init__(self, context: object) -> None: + self.context = context + + +@pytest.mark.asyncio +async def test_shell_executor_wraps_in_sh_dash_lc() -> None: + recorder: list[dict[str, Any]] = [] + client = _FakeClient(recorder) + ctx = NorthflankCtx(client=client, project_id="proj-1") + executor = NorthflankShellExecutor(service_id="svc-1") # default shell="sh" + request = ShellCommandRequest( + ctx_wrapper=_FakeRunCtx(ctx), # type: ignore[arg-type] + data=ShellCallData( + call_id="call-1", + action=ShellActionRequest(commands=["echo hi", "uname -a"]), + ), + ) + result = await executor(request) + assert isinstance(result, ShellResult) + assert len(result.output) == 2 + assert result.output[0].stdout == "hi\n" + assert result.output[0].outcome.type == "exit" + assert result.output[0].outcome.exit_code == 0 + # The SDK's ``shell`` field is forwarded verbatim to the exec proxy and + # only ``"none"`` is meaningful — we have to invoke the shell ourselves. + assert all(c["shell"] == "none" for c in recorder) + assert recorder[0]["command"] == ["sh", "-lc", "echo hi"] + assert recorder[1]["command"] == ["sh", "-lc", "uname -a"] + assert all(c["service_id"] == "svc-1" and c["project_id"] == "proj-1" for c in recorder) + + +def test_wrap_shell_command_translation() -> None: + # ``none`` passes the string straight through. + cmd, shell = wrap_shell_command("ls -la", "none") + assert cmd == "ls -la" + assert shell == "none" + # ``sh`` and ``bash`` always set the SDK shell to ``none`` and wrap the + # command as an explicit argv, since the proxy only understands ``none``. + cmd, shell = wrap_shell_command("ls -la | head", "sh") + assert cmd == ["sh", "-lc", "ls -la | head"] + assert shell == "none" + cmd, shell = wrap_shell_command("echo $FOO", "bash") + assert cmd == ["bash", "-lc", "echo $FOO"] + assert shell == "none" + + +@pytest.mark.asyncio +async def test_shell_executor_shell_none_passes_through() -> None: + recorder: list[dict[str, Any]] = [] + client = _FakeClient(recorder) + ctx = NorthflankCtx(client=client, project_id="proj-1") + executor = NorthflankShellExecutor(service_id="svc-1", shell="none") + request = ShellCommandRequest( + ctx_wrapper=_FakeRunCtx(ctx), # type: ignore[arg-type] + data=ShellCallData( + call_id="call-1", + action=ShellActionRequest(commands=["ls"]), + ), + ) + await executor(request) + assert recorder[0]["command"] == "ls" + assert recorder[0]["shell"] == "none" + + +@pytest.mark.asyncio +async def test_shell_executor_raises_without_project() -> None: + client = _FakeClient([]) + ctx = NorthflankCtx(client=client) # no project_id + executor = NorthflankShellExecutor(service_id="svc-1") + request = ShellCommandRequest( + ctx_wrapper=_FakeRunCtx(ctx), # type: ignore[arg-type] + data=ShellCallData( + call_id="call-1", + action=ShellActionRequest(commands=["echo hi"]), + ), + ) + with pytest.raises(RuntimeError, match="project_id"): + await executor(request) diff --git a/tests/extensions/sandbox/test_northflank.py b/tests/extensions/sandbox/test_northflank.py new file mode 100644 index 0000000000..12b08e8fb0 --- /dev/null +++ b/tests/extensions/sandbox/test_northflank.py @@ -0,0 +1,1376 @@ +"""NorthflankSandboxClient + NorthflankSandboxSession tests. + +The Northflank SDK is faked so tests are fully offline. We assert on the +recorded SDK calls — that's what guarantees we match the wire contract. +""" + +from __future__ import annotations + +import io +from pathlib import Path +from typing import Any + +import pytest +from northflank import ApiCallError + +from agents.extensions.sandbox.northflank import ( + NorthflankSandboxClient, + NorthflankSandboxClientOptions, + NorthflankSandboxSession, + NorthflankSandboxSessionState, +) +from agents.sandbox.errors import ( + WorkspaceArchiveWriteError, + WorkspaceReadNotFoundError, +) +from agents.sandbox.manifest import Manifest +from agents.sandbox.session import SandboxSession, SandboxSessionState +from agents.sandbox.snapshot import NoopSnapshotSpec + +# --------------------------------------------------------------------------- +# Fakes +# --------------------------------------------------------------------------- + + +class _FakeExecResult: + def __init__( + self, + *, + exit_code: int = 0, + stdout: str = "", + stderr: str = "", + status: str = "completed", + ) -> None: + self.exit_code = exit_code + self.stdout = stdout + self.stderr = stderr + self.status = status + self.message = "" + + @property + def ok(self) -> bool: + return self.exit_code == 0 + + +_HELPER_PATH_PREFIX = "/tmp/openai-agents/bin/resolve-workspace-path" + + +def _looks_like_resolve_helper_invocation(cmd: tuple[Any, ...]) -> bool: + return bool(cmd) and isinstance(cmd[0], str) and cmd[0].startswith(_HELPER_PATH_PREFIX) + + +def _looks_like_helper_install(cmd: tuple[Any, ...]) -> bool: + return ( + len(cmd) >= 5 + and cmd[0] == "sh" + and cmd[1] == "-c" + and isinstance(cmd[2], str) + and "INSTALL_RUNTIME_HELPER_V1" in cmd[2] + and cmd[3] == "sh" + and isinstance(cmd[4], str) + and cmd[4].startswith(_HELPER_PATH_PREFIX) + ) + + +def _looks_like_helper_present_test(cmd: tuple[Any, ...]) -> bool: + return ( + len(cmd) >= 3 + and cmd[0] == "test" + and cmd[1] == "-x" + and isinstance(cmd[2], str) + and cmd[2].startswith(_HELPER_PATH_PREFIX) + ) + + +class _FakeExec: + """Fake Northflank exec channel. + + By default it auto-handles the openai-agents resolve-workspace-path + runtime helper: install commands return ok, presence checks return ok, + and helper invocations echo the requested workspace path back as stdout + so ``_validate_remote_path_access`` accepts the path. ``user_calls`` + (re-)exposes only the non-helper invocations for assertion purposes. + """ + + def __init__(self) -> None: + self.calls: list[dict[str, Any]] = [] + self.next_result: _FakeExecResult = _FakeExecResult() + self.results_by_prefix: list[tuple[tuple[str, ...], _FakeExecResult]] = [] + + def queue(self, command_prefix: tuple[str, ...], result: _FakeExecResult) -> None: + self.results_by_prefix.append((command_prefix, result)) + + @property + def user_calls(self) -> list[dict[str, Any]]: + """Calls that are not part of the runtime-helper install/probe round trip.""" + return [ + c + for c in self.calls + if not ( + _looks_like_helper_install(tuple(c.get("command") or ())) + or _looks_like_helper_present_test(tuple(c.get("command") or ())) + or _looks_like_resolve_helper_invocation(tuple(c.get("command") or ())) + ) + ] + + async def arun_service_command(self, **kwargs: Any) -> _FakeExecResult: + self.calls.append(kwargs) + cmd = tuple(kwargs.get("command") or ()) + if _looks_like_resolve_helper_invocation(cmd): + # Echo back the workspace path so _validate_remote_path_access + # succeeds. The helper signature is: + # [grants...] + workspace = str(cmd[2]) if len(cmd) >= 3 else "/workspace" + return _FakeExecResult(stdout=workspace) + if _looks_like_helper_install(cmd) or _looks_like_helper_present_test(cmd): + return _FakeExecResult() + for prefix, result in self.results_by_prefix: + if cmd[: len(prefix)] == prefix: + return result + return self.next_result + + +class _FakeFiles: + """Fake Northflank files endpoint. + + ``download_payload`` is written to ``/`` + so the session's read/persist paths — which pass a temp *directory* + as ``local_path`` — can look up the file by its remote basename, + matching the real SDK's directory-target semantics for + ``_extract_download_tar``. + """ + + def __init__(self) -> None: + self.uploads: list[dict[str, Any]] = [] + self.downloads: list[dict[str, Any]] = [] + self.download_payload: bytes = b"" + self.raise_on_download: Exception | None = None + + async def aupload(self, **kwargs: Any) -> None: + self.uploads.append(kwargs) + + async def adownload(self, **kwargs: Any) -> None: + self.downloads.append(kwargs) + if self.raise_on_download is not None: + raise self.raise_on_download + from pathlib import Path as _P, PurePosixPath as _PP + + local = _P(kwargs["local_path"]) + local.mkdir(parents=True, exist_ok=True) + target = local / _PP(kwargs["remote_path"]).name + target.write_bytes(self.download_payload) + + +class _FakeServiceEndpoint: + """Mirrors the SDK's CallableNamespace pattern: the endpoint is the + namespace itself, invoked directly via __call__.""" + + def __init__(self) -> None: + self.response_data: dict[str, Any] = { + "id": "svc-new", + "name": "svc-new", + } + self.calls: list[dict[str, Any]] = [] + self.raise_on_call: Exception | None = None + + async def __call__(self, **kwargs: Any) -> Any: + self.calls.append(kwargs) + if self.raise_on_call is not None: + raise self.raise_on_call + + class _R: + data = self.response_data + + return _R() + + +class _FakeGetService(_FakeServiceEndpoint): + def __init__(self) -> None: + super().__init__() + self.response_data = { + "servicePaused": False, + "status": {"deployment": {"status": "COMPLETED"}}, + } + + +class _FakeNamespace: + pass + + +class _FakeHelpers: + def __init__(self) -> None: + self.calls: list[dict[str, Any]] = [] + self.raise_on_call: Exception | None = None + + async def wait_for_service_ready(self, **kwargs: Any) -> dict[str, Any]: + self.calls.append(kwargs) + if self.raise_on_call is not None: + raise self.raise_on_call + return {"status": {"deployment": {"status": "COMPLETED"}}} + + +class _FakeClient: + def __init__(self) -> None: + self.exec = _FakeExec() + self.files = _FakeFiles() + self.helpers = _FakeHelpers() + + self._get_service = _FakeGetService() + self.get = _FakeNamespace() + self.get.service = self._get_service # type: ignore[attr-defined] + + self._create_deployment = _FakeServiceEndpoint() + self.create = _FakeNamespace() + self.create.service = _FakeNamespace() # type: ignore[attr-defined] + self.create.service.deployment = self._create_deployment # type: ignore[attr-defined] + + self._delete_service = _FakeServiceEndpoint() + self._delete_volume = _FakeServiceEndpoint() + self.delete = _FakeNamespace() + self.delete.service = self._delete_service # type: ignore[attr-defined] + self.delete.volume = self._delete_volume # type: ignore[attr-defined] + + self._create_volume = _FakeServiceEndpoint() + self._create_volume.response_data = {"id": "vol-new", "name": "vol-new"} + self.create.volume = self._create_volume # type: ignore[attr-defined] + + self._detach_volume = _FakeServiceEndpoint() + self.detach = _FakeNamespace() + self.detach.volume = self._detach_volume # type: ignore[attr-defined] + + self._attach_volume = _FakeServiceEndpoint() + self.attach = _FakeNamespace() + self.attach.volume = self._attach_volume # type: ignore[attr-defined] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_state( + *, + service_id: str = "svc-1", + project_id: str = "proj-1", + owned: bool = False, + root: str = "/workspace", +) -> NorthflankSandboxSessionState: + return NorthflankSandboxSessionState( + manifest=Manifest(root=root), + snapshot=NoopSnapshotSpec().build("snap-1"), + project_id=project_id, + service_id=service_id, + owned_by_client=owned, + ) + + +def _make_session(client: _FakeClient, **state_kwargs: Any) -> NorthflankSandboxSession: + state = _make_state(**state_kwargs) + return NorthflankSandboxSession(state=state, client=client) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_options_round_trip() -> None: + opts = NorthflankSandboxClientOptions(project_id="proj", service_id="svc", team_id="team-x") + dumped = opts.model_dump() + assert dumped["type"] == "northflank" + assert dumped["project_id"] == "proj" + assert dumped["service_id"] == "svc" + + +def test_deserialize_session_state_registry() -> None: + client = NorthflankSandboxClient(client=_FakeClient()) + state = _make_state() + raw = state.model_dump() + revived = client.deserialize_session_state(raw) + assert isinstance(revived, NorthflankSandboxSessionState) + assert revived.service_id == state.service_id + + # The polymorphic registry should pick up our subclass too. + again = SandboxSessionState.parse(raw) + assert isinstance(again, NorthflankSandboxSessionState) + + +def test_backend_id_constants() -> None: + assert NorthflankSandboxClient.backend_id == "northflank" + assert NorthflankSandboxClient.supports_default_options is False + + +@pytest.mark.asyncio +async def test_exec_internal_passes_argv_to_sdk_with_shell_none() -> None: + client = _FakeClient() + session = _make_session(client) + result = await session._exec_internal("ls", "-la", "/workspace", timeout=5.0) + assert result.exit_code == 0 + assert isinstance(result.stdout, bytes) + assert isinstance(result.stderr, bytes) + assert len(client.exec.user_calls) == 1 + call = client.exec.user_calls[0] + assert call["command"] == ["ls", "-la", "/workspace"] + assert call["shell"] == "none" + assert call["service_id"] == "svc-1" + assert call["project_id"] == "proj-1" + assert call["timeout"] == 5.0 + + +@pytest.mark.asyncio +async def test_exec_internal_falls_back_to_default_timeout() -> None: + client = _FakeClient() + session = _make_session(client) + await session._exec_internal("echo", "hi") + assert client.exec.user_calls[0]["timeout"] == session.state.exec_timeout_s + + +@pytest.mark.asyncio +async def test_running_true_when_deployment_completed() -> None: + client = _FakeClient() + session = _make_session(client) + assert await session.running() is True + + +@pytest.mark.asyncio +async def test_running_false_when_paused() -> None: + client = _FakeClient() + client._get_service.response_data = { + "servicePaused": True, + "status": {"deployment": {"status": "COMPLETED"}}, + } + session = _make_session(client) + assert await session.running() is False + + +@pytest.mark.asyncio +async def test_running_false_after_shutdown() -> None: + client = _FakeClient() + session = _make_session(client) + await session._shutdown_backend() + assert await session.running() is False + + +@pytest.mark.asyncio +async def test_read_downloads_via_files_api() -> None: + client = _FakeClient() + client.files.download_payload = b"hello world" + session = _make_session(client) + stream = await session.read(Path("/workspace/hello.txt")) + assert stream.read() == b"hello world" + assert len(client.files.downloads) == 1 + assert client.files.downloads[0]["remote_path"] == "/workspace/hello.txt" + assert client.files.downloads[0]["service_id"] == "svc-1" + + +@pytest.mark.asyncio +async def test_read_with_user_raises_not_implemented() -> None: + client = _FakeClient() + session = _make_session(client) + with pytest.raises(NotImplementedError, match="per-user"): + await session.read(Path("/workspace/x"), user="root") + + +@pytest.mark.asyncio +async def test_write_with_user_raises_not_implemented() -> None: + client = _FakeClient() + session = _make_session(client) + with pytest.raises(NotImplementedError, match="per-user"): + await session.write(Path("/workspace/x"), io.BytesIO(b"hi"), user="root") + + +@pytest.mark.asyncio +async def test_read_missing_path_raises_workspace_read_not_found() -> None: + client = _FakeClient() + client.files.raise_on_download = RuntimeError("404 not found") + session = _make_session(client) + with pytest.raises(WorkspaceReadNotFoundError): + await session.read(Path("/workspace/missing.txt")) + + +@pytest.mark.asyncio +async def test_write_mkdirs_then_uploads_directory_with_correct_basename() -> None: + client = _FakeClient() + captured: dict[str, Any] = {} + original_upload = client.files.aupload + + async def capturing_upload(**kwargs: Any) -> None: + local_dir = Path(kwargs["local_path"]) + captured["local_dir"] = str(local_dir) + captured["entries"] = sorted(p.name for p in local_dir.iterdir()) + target = local_dir / captured["entries"][0] + captured["bytes"] = target.read_bytes() + await original_upload(**kwargs) + + client.files.aupload = capturing_upload # type: ignore[method-assign] + session = _make_session(client) + await session.write(Path("/workspace/sub/dir/file.txt"), io.BytesIO(b"payload")) + + # mkdir -p of the parent fires first. + mkdir_call = client.exec.user_calls[0] + assert mkdir_call["command"] == ["mkdir", "-p", "/workspace/sub/dir"] + # Upload is a *directory* targeted at the parent so the SDK's directory + # extraction places the file at / regardless of suffix. + assert len(client.files.uploads) == 1 + upload = client.files.uploads[0] + assert upload["remote_path"] == "/workspace/sub/dir" + assert captured["entries"] == ["file.txt"] + assert captured["bytes"] == b"payload" + + +@pytest.mark.asyncio +async def test_write_handles_extensionless_paths() -> None: + """An extensionless remote like /workspace/Makefile must not be treated + as a directory by the SDK's upload heuristic.""" + client = _FakeClient() + captured: dict[str, Any] = {} + original_upload = client.files.aupload + + async def capturing_upload(**kwargs: Any) -> None: + local_dir = Path(kwargs["local_path"]) + captured["entries"] = sorted(p.name for p in local_dir.iterdir()) + captured["remote_path"] = kwargs["remote_path"] + await original_upload(**kwargs) + + client.files.aupload = capturing_upload # type: ignore[method-assign] + session = _make_session(client) + await session.write(Path("/workspace/Makefile"), io.BytesIO(b"all: build\n")) + + # The staging directory has exactly one entry — the basename Makefile — + # and remote_path is the parent directory. + assert captured["entries"] == ["Makefile"] + assert captured["remote_path"] == "/workspace" + + +@pytest.mark.asyncio +async def test_write_propagates_upload_failure_as_archive_write_error() -> None: + client = _FakeClient() + + async def boom(**kwargs: Any) -> None: + raise RuntimeError("permission denied") + + client.files.aupload = boom # type: ignore[method-assign] + session = _make_session(client) + with pytest.raises(WorkspaceArchiveWriteError): + await session.write(Path("/workspace/x"), io.BytesIO(b"hi")) + + +def _make_valid_tar_bytes(entries: dict[str, bytes]) -> bytes: + """Build a tar archive in memory with safe member names.""" + import tarfile + + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w") as tf: + for name, data in entries.items(): + info = tarfile.TarInfo(name=name) + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + return buf.getvalue() + + +@pytest.mark.asyncio +async def test_hydrate_then_persist_workspace_round_trip() -> None: + client = _FakeClient() + session = _make_session(client) + tar_bytes = _make_valid_tar_bytes({"./hello.txt": b"hi\n"}) + + # Hydrate uploads (staging dir) + extracts on remote. + upload_snapshot: dict[str, Any] = {} + original_upload = client.files.aupload + + async def snapshotting_upload(**kwargs: Any) -> None: + local_dir = Path(kwargs["local_path"]) + upload_snapshot["is_dir"] = local_dir.is_dir() + upload_snapshot["entries"] = sorted(p.name for p in local_dir.iterdir()) + await original_upload(**kwargs) + + client.files.aupload = snapshotting_upload # type: ignore[method-assign] + + await session.hydrate_workspace(io.BytesIO(tar_bytes)) + user_cmds = [tuple(c["command"]) for c in client.exec.user_calls] + assert any("tar" in " ".join(cmd) for cmd in user_cmds) + assert any(cmd and cmd[0] == "rm" for cmd in user_cmds) + assert len(client.files.uploads) == 1 + assert upload_snapshot["is_dir"] is True + # The staging dir held exactly one entry — the per-call archive basename. + assert len(upload_snapshot["entries"]) == 1 + assert upload_snapshot["entries"][0].endswith(".tar") + assert client.files.uploads[0]["remote_path"] == "/tmp" + + # Persist: tar in container, download to a directory, rm. + client.exec.calls.clear() + client.files.download_payload = tar_bytes + stream = await session.persist_workspace() + assert stream.read() == tar_bytes + first = client.exec.user_calls[0] + assert "tar" in first["command"][2] + assert len(client.files.downloads) >= 1 + + +@pytest.mark.asyncio +async def test_hydrate_rejects_absolute_member() -> None: + client = _FakeClient() + session = _make_session(client) + bad = _make_valid_tar_bytes({"/etc/passwd": b"x"}) + with pytest.raises(WorkspaceArchiveWriteError): + await session.hydrate_workspace(io.BytesIO(bad)) + + +@pytest.mark.asyncio +async def test_hydrate_rejects_traversal_member() -> None: + client = _FakeClient() + session = _make_session(client) + bad = _make_valid_tar_bytes({"../escape": b"x"}) + with pytest.raises(WorkspaceArchiveWriteError): + await session.hydrate_workspace(io.BytesIO(bad)) + + +def _make_symlink_tar(name: str, linkname: str) -> bytes: + import tarfile + + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w") as tf: + info = tarfile.TarInfo(name=name) + info.type = tarfile.SYMTYPE + info.linkname = linkname + tf.addfile(info) + return buf.getvalue() + + +@pytest.mark.asyncio +async def test_hydrate_rejects_absolute_symlink_target() -> None: + client = _FakeClient() + session = _make_session(client) + with pytest.raises(WorkspaceArchiveWriteError): + await session.hydrate_workspace(io.BytesIO(_make_symlink_tar("link", "/etc/passwd"))) + + +@pytest.mark.asyncio +async def test_hydrate_rejects_symlink_escaping_archive() -> None: + client = _FakeClient() + session = _make_session(client) + with pytest.raises(WorkspaceArchiveWriteError): + await session.hydrate_workspace(io.BytesIO(_make_symlink_tar("sub/link", "../../outside"))) + + +@pytest.mark.asyncio +async def test_hydrate_allows_relative_symlink_within_archive() -> None: + client = _FakeClient() + session = _make_session(client) + # ``sub/link`` -> ``../other.txt`` resolves to the archive's ``other.txt``, + # which is inside the root — allowed. + await session.hydrate_workspace(io.BytesIO(_make_symlink_tar("sub/link", "../other.txt"))) + # Upload + extract + cleanup ran without raising. + assert len(client.files.uploads) == 1 + assert any( + tuple(c["command"])[:1] == ("sh",) and "tar" in c["command"][2] + for c in client.exec.user_calls + ) + + +@pytest.mark.asyncio +async def test_hydrate_rejects_root_level_dotdot_symlink() -> None: + """A symlink at the archive root pointing to ``..`` escapes — there + is no parent inside the archive to pop into.""" + client = _FakeClient() + session = _make_session(client) + with pytest.raises(WorkspaceArchiveWriteError): + await session.hydrate_workspace(io.BytesIO(_make_symlink_tar("link", ".."))) + + +@pytest.mark.asyncio +async def test_hydrate_allows_symlink_pointing_to_sibling() -> None: + """``a/link`` -> ``b.txt`` resolves to ``a/b.txt`` (same directory).""" + client = _FakeClient() + session = _make_session(client) + await session.hydrate_workspace(io.BytesIO(_make_symlink_tar("a/link", "b.txt"))) + assert len(client.files.uploads) == 1 + + +@pytest.mark.asyncio +async def test_hydrate_allows_symlink_resolving_to_archive_root() -> None: + """``a/link`` -> ``..`` resolves to the archive root itself — inside, + so allowed (a symlink to the workspace root is a weird but legal + archive entry, not an escape).""" + client = _FakeClient() + session = _make_session(client) + await session.hydrate_workspace(io.BytesIO(_make_symlink_tar("a/link", ".."))) + assert len(client.files.uploads) == 1 + + +@pytest.mark.asyncio +async def test_hydrate_handles_collapsed_double_slashes_in_link_target() -> None: + """``a//b`` is just ``a/b`` after PurePosixPath normalisation; the + helper must not treat the empty component as a traversal escape.""" + client = _FakeClient() + session = _make_session(client) + await session.hydrate_workspace(io.BytesIO(_make_symlink_tar("a/link", "..//b.txt"))) + assert len(client.files.uploads) == 1 + + +@pytest.mark.asyncio +async def test_hydrate_rejects_link_target_with_just_enough_dotdots_to_escape() -> None: + """``a/b/link`` -> ``../../..`` walks one segment past root — reject.""" + client = _FakeClient() + session = _make_session(client) + with pytest.raises(WorkspaceArchiveWriteError): + await session.hydrate_workspace(io.BytesIO(_make_symlink_tar("a/b/link", "../../.."))) + + +@pytest.mark.asyncio +async def test_hydrate_wraps_non_tar_bytes_as_archive_write_error() -> None: + """Bytes that aren't a tar at all should surface as a + WorkspaceArchiveWriteError (the ``invalid_tar`` path), not the raw + tarfile exception.""" + client = _FakeClient() + session = _make_session(client) + with pytest.raises(WorkspaceArchiveWriteError): + await session.hydrate_workspace(io.BytesIO(b"not a tar at all")) + + +@pytest.mark.asyncio +async def test_hydrate_rejects_device_member() -> None: + import tarfile + + client = _FakeClient() + session = _make_session(client) + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w") as tf: + info = tarfile.TarInfo(name="dev/null") + info.type = tarfile.CHRTYPE + info.devmajor = 1 + info.devminor = 3 + tf.addfile(info) + with pytest.raises(WorkspaceArchiveWriteError): + await session.hydrate_workspace(io.BytesIO(buf.getvalue())) + + +@pytest.mark.asyncio +async def test_persist_workspace_emits_exclude_args_from_skip_paths() -> None: + client = _FakeClient() + session = _make_session(client) + rel = session.register_persist_workspace_skip_path(".cache") + + client.files.download_payload = _make_valid_tar_bytes({"./x": b"y"}) + await session.persist_workspace() + first = client.exec.user_calls[0] + tar_script = first["command"][2] + # shell_tar_exclude_args emits both bare and ``./``-prefixed forms. + assert "--exclude=.cache" in tar_script or "--exclude='.cache'" in tar_script + assert "--exclude=./.cache" in tar_script or "--exclude='./.cache'" in tar_script + # The registered path was the one we requested. + assert str(rel) == ".cache" + + +# -- client lifecycle ------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_create_attach_mode_does_not_create_service() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions(project_id="proj", service_id="svc-existing") + session = await sandbox_client.create(options=options) + assert isinstance(session, SandboxSession) + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + assert inner.state.service_id == "svc-existing" + assert inner.state.owned_by_client is False + # No deployment creation, no helpers waiting. + assert client._create_deployment.calls == [] + assert client.helpers.calls == [] + + +@pytest.mark.asyncio +async def test_create_ephemeral_mode_creates_and_waits() -> None: + client = _FakeClient() + client._create_deployment.response_data = {"id": "svc-new", "name": "svc-new"} + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="nginx:1.27", + ) + session = await sandbox_client.create(options=options) + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + assert inner.state.service_id == "svc-new" + assert inner.state.owned_by_client is True + assert len(client._create_deployment.calls) == 1 + create_call = client._create_deployment.calls[0] + assert create_call["data"]["deployment"]["external"]["imagePath"] == "nginx:1.27" + # wait_for_ready=True by default + assert len(client.helpers.calls) == 1 + + +@pytest.mark.asyncio +async def test_create_ephemeral_sets_docker_command() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="ubuntu:24.04", + docker_command="sleep infinity", + ) + await sandbox_client.create(options=options) + payload = client._create_deployment.calls[0]["data"] + assert payload["deployment"]["docker"] == { + "configType": "customCommand", + "customCommand": "sleep infinity", + } + + +@pytest.mark.asyncio +async def test_create_ephemeral_sets_entrypoint_and_command() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + docker_entrypoint="/bin/sh", + docker_command="-c 'tail -f /dev/null'", + ) + await sandbox_client.create(options=options) + payload = client._create_deployment.calls[0]["data"] + assert payload["deployment"]["docker"] == { + "configType": "customEntrypointCustomCommand", + "customEntrypoint": "/bin/sh", + "customCommand": "-c 'tail -f /dev/null'", + } + + +@pytest.mark.asyncio +async def test_create_ephemeral_omits_docker_block_by_default() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions(project_id="proj", image_path="alpine") + await sandbox_client.create(options=options) + payload = client._create_deployment.calls[0]["data"] + assert "docker" not in payload["deployment"] + + +@pytest.mark.asyncio +async def test_create_ephemeral_deletes_service_when_wait_fails() -> None: + """If wait_for_service_ready raises, the service is already on + Northflank but no SandboxSession ever materialises — the client must + delete it best-effort so it doesn't leak.""" + client = _FakeClient() + client.helpers.raise_on_call = TimeoutError("deployment did not become ready") + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions(project_id="proj", image_path="alpine") + with pytest.raises(TimeoutError): + await sandbox_client.create(options=options) + assert len(client._delete_service.calls) == 1 + assert client._delete_service.calls[0]["service_id"] == "svc-new" + assert client._delete_service.calls[0]["delete_child_objects"] is True + + +@pytest.mark.asyncio +async def test_create_ephemeral_wait_cleanup_swallows_delete_failure() -> None: + """If both wait and the cleanup delete fail, the original wait error + must still propagate — the delete is best-effort and must not mask it.""" + client = _FakeClient() + client.helpers.raise_on_call = TimeoutError("deployment did not become ready") + client._delete_service.raise_on_call = RuntimeError("network blip") + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions(project_id="proj", image_path="alpine") + with pytest.raises(TimeoutError): + await sandbox_client.create(options=options) + + +@pytest.mark.asyncio +async def test_create_rejects_both_service_id_and_image_path() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", service_id="svc", image_path="nginx:1.27" + ) + with pytest.raises(ValueError): + await sandbox_client.create(options=options) + + +@pytest.mark.asyncio +async def test_create_requires_service_id_or_image() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions(project_id="proj") + with pytest.raises(ValueError): + await sandbox_client.create(options=options) + + +@pytest.mark.asyncio +async def test_resume_attaches_to_existing_state() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + state = _make_state(service_id="svc-resumed", owned=True) + session = await sandbox_client.resume(state) + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + assert inner.state.service_id == "svc-resumed" + # Resume must not create a new service or wait. + assert client._create_deployment.calls == [] + assert client.helpers.calls == [] + + +@pytest.mark.asyncio +async def test_delete_swallows_only_404() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + client._create_deployment.response_data = {"id": "svc-new"} + session = await sandbox_client.create( + options=NorthflankSandboxClientOptions(project_id="proj", image_path="alpine") + ) + + # 404 → swallowed (service already gone is not a cleanup failure) + client._delete_service.raise_on_call = ApiCallError(status=404, message="not found") + await sandbox_client.delete(session) + + # 401 → surfaced (cleanup masked an auth issue we want to see) + session = await sandbox_client.create( + options=NorthflankSandboxClientOptions(project_id="proj", image_path="alpine") + ) + client._delete_service.raise_on_call = ApiCallError(status=401, message="unauthorized") + with pytest.raises(ApiCallError) as excinfo: + await sandbox_client.delete(session) + assert excinfo.value.status == 401 + + +@pytest.mark.asyncio +async def test_delete_only_removes_client_owned_services() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + attached = await sandbox_client.create( + options=NorthflankSandboxClientOptions(project_id="proj", service_id="svc-existing") + ) + await sandbox_client.delete(attached) + assert client._delete_service.calls == [], "must not delete attached service" + + client._create_deployment.response_data = {"id": "svc-new"} + ephemeral = await sandbox_client.create( + options=NorthflankSandboxClientOptions(project_id="proj", image_path="alpine") + ) + await sandbox_client.delete(ephemeral) + assert len(client._delete_service.calls) == 1 + assert client._delete_service.calls[0]["service_id"] == "svc-new" + + +# -- workspace_persistence: volume mode ------------------------------------ + + +@pytest.mark.asyncio +async def test_volume_mode_create_provisions_volume_and_attaches_to_service() -> None: + client = _FakeClient() + client._create_deployment.response_data = {"id": "svc-new", "name": "svc-new"} + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="ubuntu:24.04", + docker_command="sleep infinity", + workspace_persistence="volume", + volume_spec={"storageSize": 20480, "accessMode": "ReadWriteOnce"}, + ) + + session = await sandbox_client.create(options=options) + + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + assert inner.state.workspace_persistence == "volume" + assert inner.state.volume_id == "vol-new" + assert inner.state.owned_volume is True + # The service was deployed and the volume was created attached to it. + assert len(client._create_deployment.calls) == 1 + assert len(client._create_volume.calls) == 1 + volume_call = client._create_volume.calls[0] + payload = volume_call["data"] + assert payload["spec"] == {"storageSize": 20480, "accessMode": "ReadWriteOnce"} + assert payload["mounts"] == [{"containerMountPath": "/workspace"}] + assert payload["attachedObjects"] == [{"id": "svc-new", "type": "service"}] + # Readiness wait still ran — once, AFTER the volume was attached. + assert len(client.helpers.calls) == 1 + + +@pytest.mark.asyncio +async def test_volume_mode_uses_default_spec_when_unspecified() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + ) + await sandbox_client.create(options=options) + payload = client._create_volume.calls[0]["data"] + assert payload["spec"] == { + "storageSize": 5120, + "accessMode": "ReadWriteMany", + "storageClassName": "nf-multi-rw", + } + + +@pytest.mark.asyncio +async def test_volume_mode_requires_image_path() -> None: + """volume mode only makes sense for client-created services.""" + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + service_id="svc-existing", + workspace_persistence="volume", + ) + with pytest.raises(ValueError, match="image_path"): + await sandbox_client.create(options=options) + # No mutation made. + assert client._create_volume.calls == [] + assert client._create_deployment.calls == [] + + +@pytest.mark.asyncio +async def test_volume_mode_cleans_up_service_when_volume_creation_fails() -> None: + client = _FakeClient() + client._create_volume.raise_on_call = RuntimeError("volume quota exceeded") + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + ) + with pytest.raises(RuntimeError, match="quota"): + await sandbox_client.create(options=options) + # Service was created but volume failed — delete the stranded service. + assert len(client._delete_service.calls) == 1 + assert client._delete_service.calls[0]["service_id"] == "svc-new" + # No volume was created, so no volume delete either. + assert client._delete_volume.calls == [] + + +@pytest.mark.asyncio +async def test_volume_mode_cleans_up_volume_and_service_when_wait_fails() -> None: + """If the post-volume-attach readiness wait fails, both the volume and + service must be best-effort deleted so no stray resource leaks.""" + client = _FakeClient() + client.helpers.raise_on_call = TimeoutError("readiness deadline") + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + ) + with pytest.raises(TimeoutError): + await sandbox_client.create(options=options) + assert len(client._delete_volume.calls) == 1 + assert client._delete_volume.calls[0]["volume_id"] == "vol-new" + assert len(client._delete_service.calls) == 1 + + +@pytest.mark.asyncio +async def test_volume_mode_delete_runs_service_then_detach_then_volume() -> None: + """Northflank refuses to delete a volume while ``attachedObjects`` + still references the service, and deleting the service does not + auto-detach. delete() must therefore: (1) remove the service, + (2) explicitly detach the volume from it, (3) delete the volume.""" + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + ) + session = await sandbox_client.create(options=options) + + order: list[str] = [] + # Wrap each namespace entry so we record the actual call order. + real_delete_service = client._delete_service + + async def trace_delete_service(**kwargs: Any) -> Any: + order.append("delete.service") + return await real_delete_service(**kwargs) + + real_detach_volume = client._detach_volume + + async def trace_detach_volume(**kwargs: Any) -> Any: + order.append("detach.volume") + return await real_detach_volume(**kwargs) + + real_delete_volume = client._delete_volume + + async def trace_delete_volume(**kwargs: Any) -> Any: + order.append("delete.volume") + return await real_delete_volume(**kwargs) + + client.delete.service = trace_delete_service # type: ignore[assignment] + client.detach.volume = trace_detach_volume # type: ignore[assignment] + client.delete.volume = trace_delete_volume # type: ignore[assignment] + + await sandbox_client.delete(session) + assert order == ["delete.service", "detach.volume", "delete.volume"] + assert client._detach_volume.calls[0]["volume_id"] == "vol-new" + assert client._detach_volume.calls[0]["data"] == { + "nfObject": {"id": "svc-new", "type": "service"} + } + + +@pytest.mark.asyncio +async def test_volume_mode_delete_swallows_volume_404() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + ) + session = await sandbox_client.create(options=options) + + client._delete_volume.raise_on_call = ApiCallError(status=404, message="gone") + await sandbox_client.delete(session) + # Service still gets removed even though the volume was already gone. + assert len(client._delete_service.calls) == 1 + + +@pytest.mark.asyncio +async def test_volume_mode_state_round_trips_through_serialize() -> None: + """volume_id, owned_volume, and workspace_persistence must survive + serialize/deserialize so resumed sessions can clean up properly.""" + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + ) + session = await sandbox_client.create(options=options) + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + + raw = sandbox_client.serialize_session_state(inner.state) + revived = sandbox_client.deserialize_session_state(raw) + assert isinstance(revived, NorthflankSandboxSessionState) + assert revived.workspace_persistence == "volume" + assert revived.volume_id == "vol-new" + assert revived.owned_volume is True + + +# -- workspace_persistence: caller-owned volume ---------------------------- + + +@pytest.mark.asyncio +async def test_volume_mode_attaches_caller_supplied_volume_to_new_service() -> None: + """Passing volume_id together with image_path attaches the existing + volume to the freshly-created service. No volume create happens, + and owned_volume stays False.""" + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + volume_id="vol-existing", + ) + + session = await sandbox_client.create(options=options) + + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + assert inner.state.workspace_persistence == "volume" + assert inner.state.volume_id == "vol-existing" + assert inner.state.owned_volume is False + # No volume creation. One attach call against the new service. + assert client._create_volume.calls == [] + assert len(client._attach_volume.calls) == 1 + attach = client._attach_volume.calls[0] + assert attach["volume_id"] == "vol-existing" + assert attach["data"] == {"nfObject": {"id": "svc-new", "type": "service"}} + # Readiness wait still ran AFTER the attach. + assert len(client.helpers.calls) == 1 + + +@pytest.mark.asyncio +async def test_volume_mode_attaches_caller_supplied_volume_to_existing_service() -> None: + """service_id + volume_id is a valid combination: attach the + caller's volume to the caller's service. Neither resource is owned + by the client.""" + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + service_id="svc-existing", + workspace_persistence="volume", + volume_id="vol-existing", + ) + + session = await sandbox_client.create(options=options) + + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + assert inner.state.owned_by_client is False + assert inner.state.owned_volume is False + assert inner.state.volume_id == "vol-existing" + # No service create, no service wait. + assert client._create_deployment.calls == [] + assert client.helpers.calls == [] + # Volume was attached to the caller's service. + assert len(client._attach_volume.calls) == 1 + assert client._attach_volume.calls[0]["data"] == { + "nfObject": {"id": "svc-existing", "type": "service"} + } + + +@pytest.mark.asyncio +async def test_volume_mode_rejects_volume_id_with_volume_spec() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + volume_id="vol-existing", + volume_spec={"storageSize": 5120, "accessMode": "ReadWriteMany"}, + ) + with pytest.raises(ValueError, match="volume_spec is ignored"): + await sandbox_client.create(options=options) + assert client._create_deployment.calls == [] + assert client._attach_volume.calls == [] + + +@pytest.mark.asyncio +async def test_volume_id_without_workspace_persistence_raises() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + volume_id="vol-existing", + ) + with pytest.raises(ValueError, match="workspace_persistence='volume'"): + await sandbox_client.create(options=options) + + +@pytest.mark.asyncio +async def test_volume_mode_tolerates_already_attached_409() -> None: + """If the caller's volume is already attached to this service, the + attach call returns 409; we treat that as success.""" + client = _FakeClient() + client._attach_volume.raise_on_call = ApiCallError( + status=409, message="Volume already attached" + ) + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + volume_id="vol-existing", + ) + + session = await sandbox_client.create(options=options) + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + assert inner.state.volume_id == "vol-existing" + assert inner.state.owned_volume is False + + +@pytest.mark.asyncio +async def test_volume_mode_attach_failure_deletes_owned_service() -> None: + """If attach.volume raises a real (non-409) error, the client must + clean up the service it just created — otherwise it leaks.""" + client = _FakeClient() + client._attach_volume.raise_on_call = ApiCallError(status=404, message="Volume not found") + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + volume_id="vol-missing", + ) + with pytest.raises(ApiCallError): + await sandbox_client.create(options=options) + assert len(client._delete_service.calls) == 1 + assert client._delete_service.calls[0]["service_id"] == "svc-new" + + +@pytest.mark.asyncio +async def test_volume_mode_wait_failure_detaches_caller_volume_no_delete() -> None: + """When readiness wait fails after attaching a caller-owned volume, + cleanup detaches the volume but never deletes it.""" + client = _FakeClient() + client.helpers.raise_on_call = TimeoutError("readiness deadline") + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + volume_id="vol-existing", + ) + with pytest.raises(TimeoutError): + await sandbox_client.create(options=options) + assert len(client._delete_service.calls) == 1 + assert len(client._detach_volume.calls) == 1 + assert client._detach_volume.calls[0]["volume_id"] == "vol-existing" + # Caller's volume must never be deleted on cleanup. + assert client._delete_volume.calls == [] + + +@pytest.mark.asyncio +async def test_volume_mode_delete_detaches_caller_volume_without_deleting() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="volume", + volume_id="vol-existing", + ) + session = await sandbox_client.create(options=options) + + await sandbox_client.delete(session) + # Service deleted (client-owned), volume detached, volume NOT deleted. + assert len(client._delete_service.calls) == 1 + assert len(client._detach_volume.calls) == 1 + assert client._detach_volume.calls[0]["volume_id"] == "vol-existing" + assert client._delete_volume.calls == [] + + +@pytest.mark.asyncio +async def test_volume_mode_delete_attach_mode_only_detaches() -> None: + """service_id + volume_id: the caller owns both. delete() must not + touch the service or delete the volume — only detach the volume so + we don't leave the caller's resources cross-wired to nothing.""" + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + service_id="svc-existing", + workspace_persistence="volume", + volume_id="vol-existing", + ) + session = await sandbox_client.create(options=options) + + await sandbox_client.delete(session) + assert client._delete_service.calls == [] + assert client._delete_volume.calls == [] + assert len(client._detach_volume.calls) == 1 + assert client._detach_volume.calls[0]["data"] == { + "nfObject": {"id": "svc-existing", "type": "service"} + } + + +# -- workspace_persistence: tar mode --------------------------------------- + + +@pytest.mark.asyncio +async def test_tar_mode_persist_snapshot_captures_workspace_tar_into_state() -> None: + client = _FakeClient() + state = NorthflankSandboxSessionState( + manifest=Manifest(root="/workspace"), + snapshot=NoopSnapshotSpec().build("snap-1"), + project_id="proj", + service_id="svc-tar", + workspace_persistence="tar", + ) + session = NorthflankSandboxSession(state=state, client=client) + + # When _persist_snapshot runs it should tar the workspace and stash + # the bytes (base64) in state.persisted_workspace_tar_b64. + payload = _make_valid_tar_bytes({"./hello.txt": b"hi\n"}) + client.files.download_payload = payload + + await session._persist_snapshot() + + assert session.state.persisted_workspace_tar_b64 is not None + import base64 + + decoded = base64.b64decode(session.state.persisted_workspace_tar_b64) + assert decoded == payload + + +@pytest.mark.asyncio +async def test_tar_mode_default_persistence_is_noop_for_snapshot() -> None: + """Without workspace_persistence set, _persist_snapshot must not embed + a tar in state (default ephemeral behaviour unchanged).""" + client = _FakeClient() + state = NorthflankSandboxSessionState( + manifest=Manifest(root="/workspace"), + snapshot=NoopSnapshotSpec().build("snap-1"), + project_id="proj", + service_id="svc-default", + # workspace_persistence stays None (default). + ) + session = NorthflankSandboxSession(state=state, client=client) + await session._persist_snapshot() + assert session.state.persisted_workspace_tar_b64 is None + + +@pytest.mark.asyncio +async def test_tar_mode_prepare_backend_workspace_hydrates_from_state() -> None: + """On resume, _prepare_backend_workspace must replay the captured tar + into the container workspace.""" + client = _FakeClient() + payload = _make_valid_tar_bytes({"./resumed.txt": b"hello\n"}) + import base64 + + state = NorthflankSandboxSessionState( + manifest=Manifest(root="/workspace"), + snapshot=NoopSnapshotSpec().build("snap-1"), + project_id="proj", + service_id="svc-tar", + workspace_persistence="tar", + persisted_workspace_tar_b64=base64.b64encode(payload).decode("ascii"), + ) + session = NorthflankSandboxSession(state=state, client=client) + + await session._prepare_backend_workspace() + + # First call: mkdir -p /workspace. Then a tar upload + extract round trip. + assert client.exec.user_calls[0]["command"] == ["mkdir", "-p", "/workspace"] + assert len(client.files.uploads) == 1 + extract_cmds = [ + c["command"] + for c in client.exec.user_calls + if len(c["command"]) > 2 and "tar" in c["command"][2] + ] + assert extract_cmds, "expected a tar extract command to run" + + +@pytest.mark.asyncio +async def test_tar_mode_state_round_trips_through_serialize() -> None: + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + workspace_persistence="tar", + ) + session = await sandbox_client.create(options=options) + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + # Volume should NOT have been provisioned in tar mode. + assert client._create_volume.calls == [] + assert inner.state.volume_id is None + assert inner.state.owned_volume is False + assert inner.state.workspace_persistence == "tar" + + raw = sandbox_client.serialize_session_state(inner.state) + revived = sandbox_client.deserialize_session_state(raw) + assert isinstance(revived, NorthflankSandboxSessionState) + assert revived.workspace_persistence == "tar" + + +@pytest.mark.asyncio +async def test_default_persistence_does_not_provision_volume() -> None: + """Default (workspace_persistence=None) must keep current behaviour: + no volume create, no state mutation, ephemeral lifecycle only.""" + client = _FakeClient() + sandbox_client = NorthflankSandboxClient(client=client) + options = NorthflankSandboxClientOptions( + project_id="proj", + image_path="alpine", + ) + session = await sandbox_client.create(options=options) + inner = session._inner + assert isinstance(inner, NorthflankSandboxSession) + assert client._create_volume.calls == [] + assert inner.state.workspace_persistence is None + assert inner.state.volume_id is None + assert inner.state.owned_volume is False + assert inner.state.persisted_workspace_tar_b64 is None + + await sandbox_client.delete(session) + assert client._delete_volume.calls == [] + assert len(client._delete_service.calls) == 1 From c0dab526dfcbe58284df4a030fd4bbc3199447d1 Mon Sep 17 00:00:00 2001 From: Simo Aleksandrov Date: Mon, 18 May 2026 00:44:59 +0300 Subject: [PATCH 2/2] docs(examples): add Northflank examples MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three example runners: * examples/northflank/list_services.py — agent uses northflank_tools() to enumerate services in a project and summarise them. * examples/northflank/remote_shell.py — agent uses ShellTool wired to NorthflankShellExecutor so every shell command runs inside a target Northflank service. * examples/sandbox/extensions/northflank_runner.py — manual sandbox runner: small workspace, one shell tool, plus a stop/resume snapshot round-trip check against a real Northflank service. --- examples/northflank/list_services.py | 44 +++ examples/northflank/remote_shell.py | 85 +++++ .../sandbox/extensions/northflank_runner.py | 319 ++++++++++++++++++ 3 files changed, 448 insertions(+) create mode 100644 examples/northflank/list_services.py create mode 100644 examples/northflank/remote_shell.py create mode 100644 examples/sandbox/extensions/northflank_runner.py diff --git a/examples/northflank/list_services.py b/examples/northflank/list_services.py new file mode 100644 index 0000000000..401b51f729 --- /dev/null +++ b/examples/northflank/list_services.py @@ -0,0 +1,44 @@ +"""Minimal example: ask an agent to list services in a Northflank project. + +Run:: + + OPENAI_API_KEY=... NF_API_TOKEN=... \\ + python examples/northflank/list_services.py demo-app +""" + +from __future__ import annotations + +import asyncio +import sys + +from northflank import AsyncApiClient + +from agents import Agent, Runner +from agents.extensions.northflank import NorthflankCtx, northflank_tools + + +async def main(project_id: str) -> None: + client = AsyncApiClient() + + agent = Agent( + name="northflank-ops", + instructions=( + "You manage Northflank services on the user's behalf. " + "When the user asks about services, call nf_list_services and " + "summarise the result in two or three sentences." + ), + tools=list(northflank_tools()), + ) + + result = await Runner.run( + agent, + f"Which services are running in project {project_id}? Group them by status.", + context=NorthflankCtx(client=client, project_id=project_id), + ) + print(result.final_output) + + +if __name__ == "__main__": + if len(sys.argv) != 2: + sys.exit(f"usage: {sys.argv[0]} ") + asyncio.run(main(sys.argv[1])) diff --git a/examples/northflank/remote_shell.py b/examples/northflank/remote_shell.py new file mode 100644 index 0000000000..574d9c4773 --- /dev/null +++ b/examples/northflank/remote_shell.py @@ -0,0 +1,85 @@ +"""Example: a ShellTool whose every command runs *inside* a Northflank service. + +Run:: + + OPENAI_API_KEY=... NF_API_TOKEN=... \\ + python examples/northflank/remote_shell.py demo-app api +""" + +from __future__ import annotations + +import asyncio +import os +import sys +from collections.abc import Sequence + +from northflank import AsyncApiClient + +from agents import Agent, ModelSettings, Runner, ShellTool +from agents.extensions.northflank import NorthflankCtx, NorthflankShellExecutor +from agents.items import ToolApprovalItem +from agents.run_context import RunContextWrapper +from agents.tool import ShellOnApprovalFunctionResult + +AUTO_APPROVE = os.environ.get("SHELL_AUTO_APPROVE") == "1" + + +async def prompt_for_approval(commands: Sequence[str]) -> bool: + if AUTO_APPROVE: + return True + print("Approve these commands?") + for command in commands: + print(" $", command) + return input("[y/N] ").strip().lower() in {"y", "yes"} + + +async def on_shell_approval( + _ctx: RunContextWrapper, item: ToolApprovalItem +) -> ShellOnApprovalFunctionResult: + raw = item.raw_item + commands: Sequence[str] = () + if isinstance(raw, dict): + action = raw.get("action", {}) + if isinstance(action, dict): + commands = action.get("commands", []) + else: + action_obj = getattr(raw, "action", None) + if action_obj is not None and hasattr(action_obj, "commands"): + commands = action_obj.commands + approved = await prompt_for_approval(commands) + return {"approve": approved, "reason": "ok" if approved else "user rejected"} + + +async def main(project_id: str, service_id: str) -> None: + client = AsyncApiClient() + executor = NorthflankShellExecutor(service_id=service_id, shell="sh") + + agent = Agent( + name="northflank-shell", + instructions=( + "Diagnose a misbehaving Northflank service. Run a small number of " + "shell commands inside the target container and explain the output." + ), + tools=[ + ShellTool( + executor=executor, + needs_approval=True, + on_approval=on_shell_approval, + ) + ], + model_settings=ModelSettings(tool_choice="auto"), + ) + + result = await Runner.run( + agent, + "Check memory usage and list the most recent files in /tmp.", + context=NorthflankCtx(client=client, project_id=project_id), + ) + print("\n--- final ---") + print(result.final_output) + + +if __name__ == "__main__": + if len(sys.argv) != 3: + sys.exit(f"usage: {sys.argv[0]} ") + asyncio.run(main(sys.argv[1], sys.argv[2])) diff --git a/examples/sandbox/extensions/northflank_runner.py b/examples/sandbox/extensions/northflank_runner.py new file mode 100644 index 0000000000..85a2450b3a --- /dev/null +++ b/examples/sandbox/extensions/northflank_runner.py @@ -0,0 +1,319 @@ +"""Minimal Northflank-backed sandbox example for manual validation. + +Creates a tiny workspace, lets the agent inspect it through one shell +tool, prints a short answer, then verifies a stop/resume snapshot round +trip against a real Northflank service. + +Pass ``--workspace-persistence volume`` to provision a Northflank volume +mounted at the sandbox workspace root — the volume survives stop/resume +and is removed automatically by ``client.delete`` at the end. Pass +``--workspace-persistence tar`` to capture the workspace as a tar +embedded in session state on stop and replay it on resume. +""" + +import argparse +import asyncio +import io +import os +import sys +from pathlib import Path +from typing import Any, Literal + +from openai.types.responses import ResponseTextDeltaEvent + +from agents import ModelSettings, Runner +from agents.run import RunConfig +from agents.sandbox import LocalSnapshotSpec, Manifest, SandboxAgent, SandboxRunConfig + +if __package__ is None or __package__ == "": + sys.path.insert(0, str(Path(__file__).resolve().parents[3])) + +from examples.sandbox.misc.example_support import text_manifest +from examples.sandbox.misc.workspace_shell import WorkspaceShellCapability + +try: + from agents.extensions.sandbox.northflank import ( + NorthflankSandboxClient, + NorthflankSandboxClientOptions, + ) +except Exception as exc: # pragma: no cover - import path depends on optional extras + raise SystemExit( + "Northflank sandbox examples require the optional repo extra.\n" + "Install it with: uv sync --extra northflank" + ) from exc + +from northflank import AsyncApiClient + +DEFAULT_QUESTION = "Summarize this cloud sandbox workspace in 2 sentences." +SNAPSHOT_CHECK_PATH = Path("snapshot-check.txt") +SNAPSHOT_CHECK_CONTENT = "northflank snapshot round-trip ok\n" + + +def _build_manifest() -> Manifest: + return text_manifest( + { + "README.md": ( + "# Renewal Notes\n\n" + "This workspace contains a tiny account review packet for manual sandbox testing.\n" + ), + "customer.md": ( + "# Customer\n\n" + "- Name: Northwind Health.\n" + "- Renewal date: 2026-04-15.\n" + "- Risk: unresolved SSO setup.\n" + ), + "next_steps.md": ( + "# Next steps\n\n" + "1. Finish the SSO fix.\n" + "2. Confirm legal language before procurement review.\n" + ), + } + ) + + +def _require_env(name: str) -> None: + if os.environ.get(name): + return + raise SystemExit(f"{name} must be set before running this example.") + + +def _build_options( + *, + project_id: str, + team_id: str | None, + image_path: str, + docker_command: str, + workspace_persistence: Literal["volume", "tar"] | None = None, + volume_storage_size_mb: int | None = None, +) -> NorthflankSandboxClientOptions: + # When the caller hasn't overridden the size, let the provider apply + # its default volume spec (small nf-multi-rw volume). + volume_spec: dict[str, Any] | None = None + if workspace_persistence == "volume" and volume_storage_size_mb is not None: + volume_spec = { + "storageSize": volume_storage_size_mb, + "accessMode": "ReadWriteOnce", + "storageClassName": "nf-multi-rw", + } + return NorthflankSandboxClientOptions( + project_id=project_id, + team_id=team_id, + image_path=image_path, + docker_command=docker_command, + wait_for_ready=True, + wait_timeout_s=420.0, + exec_timeout_s=120.0, + workspace_persistence=workspace_persistence, + volume_spec=volume_spec, + ) + + +async def _verify_stop_resume( + *, + project_id: str, + team_id: str | None, + image_path: str, + docker_command: str, + workspace_persistence: Literal["volume", "tar"] | None = None, + volume_storage_size_mb: int | None = None, +) -> None: + import tempfile + + nf = AsyncApiClient() + client = NorthflankSandboxClient(client=nf) + options = _build_options( + project_id=project_id, + team_id=team_id, + image_path=image_path, + docker_command=docker_command, + workspace_persistence=workspace_persistence, + volume_storage_size_mb=volume_storage_size_mb, + ) + with tempfile.TemporaryDirectory(prefix="nf-snapshot-example-") as snapshot_dir: + # Track the handle that currently owns the service so the outer + # ``finally`` can always delete it. Both the original and resumed + # sessions share the same ``service_id``, so either works. + cleanup_sandbox = None + try: + sandbox = await client.create( + manifest=_build_manifest(), + snapshot=LocalSnapshotSpec(base_path=Path(snapshot_dir)), + options=options, + ) + cleanup_sandbox = sandbox + + try: + await sandbox.start() + await sandbox.write( + SNAPSHOT_CHECK_PATH, + io.BytesIO(SNAPSHOT_CHECK_CONTENT.encode("utf-8")), + ) + await sandbox.stop() + finally: + await sandbox.shutdown() + + resumed_sandbox = await client.resume(sandbox.state) + cleanup_sandbox = resumed_sandbox + try: + await resumed_sandbox.start() + restored = await resumed_sandbox.read(SNAPSHOT_CHECK_PATH) + restored_text = restored.read() + if isinstance(restored_text, bytes): + restored_text = restored_text.decode("utf-8") + if restored_text != SNAPSHOT_CHECK_CONTENT: + raise RuntimeError( + "Snapshot resume verification failed: expected " + f"{SNAPSHOT_CHECK_CONTENT!r}, got {restored_text!r}" + ) + finally: + await resumed_sandbox.shutdown() + finally: + if cleanup_sandbox is not None: + # Northflank shutdown does not tear down the deployment; + # the only path that removes it is ``client.delete``. + await client.delete(cleanup_sandbox) + + mode = workspace_persistence or "ephemeral" + print(f"snapshot round-trip ok (northflank, {mode})") + + +async def main( + *, + model: str, + question: str, + project_id: str, + team_id: str | None, + image_path: str, + docker_command: str, + stream: bool, + workspace_persistence: Literal["volume", "tar"] | None = None, + volume_storage_size_mb: int | None = None, +) -> None: + _require_env("OPENAI_API_KEY") + _require_env("NF_API_TOKEN") + + await _verify_stop_resume( + project_id=project_id, + team_id=team_id, + image_path=image_path, + docker_command=docker_command, + workspace_persistence=workspace_persistence, + volume_storage_size_mb=volume_storage_size_mb, + ) + + manifest = _build_manifest() + agent = SandboxAgent( + name="Northflank Sandbox Assistant", + model=model, + instructions=( + "Answer questions about the sandbox workspace. Inspect the files before answering " + "and keep the response concise. " + "Do not invent files or statuses that are not present in the workspace. Cite the " + "file names you inspected." + ), + default_manifest=manifest, + capabilities=[WorkspaceShellCapability()], + model_settings=ModelSettings(tool_choice="required"), + ) + + nf = AsyncApiClient() + run_config = RunConfig( + sandbox=SandboxRunConfig( + client=NorthflankSandboxClient(client=nf), + options=_build_options( + project_id=project_id, + team_id=team_id, + image_path=image_path, + docker_command=docker_command, + workspace_persistence=workspace_persistence, + volume_storage_size_mb=volume_storage_size_mb, + ), + ), + workflow_name="Northflank sandbox example", + ) + + if not stream: + result = await Runner.run(agent, question, run_config=run_config) + print(result.final_output) + return + + stream_result = Runner.run_streamed(agent, question, run_config=run_config) + saw_text_delta = False + async for event in stream_result.stream_events(): + if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): + if not saw_text_delta: + print("assistant> ", end="", flush=True) + saw_text_delta = True + print(event.data.delta, end="", flush=True) + + if saw_text_delta: + print() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--model", default="gpt-5", help="Model name to use.") + parser.add_argument("--question", default=DEFAULT_QUESTION, help="Prompt to send to the agent.") + parser.add_argument( + "--project-id", + default=os.environ.get("NF_PROJECT_ID"), + help="Northflank project id (default: $NF_PROJECT_ID).", + ) + parser.add_argument( + "--team-id", + default=os.environ.get("NF_TEAM_ID"), + help="Optional Northflank team id (default: $NF_TEAM_ID).", + ) + parser.add_argument( + "--image-path", + default="ubuntu:24.04", + help="Base image for the ephemeral deployment.", + ) + parser.add_argument( + "--docker-command", + default="sleep infinity", + help="CMD override so the container stays alive long enough for exec.", + ) + parser.add_argument("--stream", action="store_true", default=False, help="Stream the response.") + parser.add_argument( + "--workspace-persistence", + choices=["volume", "tar"], + default=None, + help=( + "Workspace persistence strategy. 'volume' provisions a Northflank volume " + "mounted at the workspace root (survives stop/resume; auto-deleted on " + "client.delete). 'tar' captures the workspace tar into session state on " + "stop and restores it on resume." + ), + ) + parser.add_argument( + "--volume-storage-size-mb", + type=int, + default=None, + help=( + "Override the volume storageSize in MiB when " + "--workspace-persistence=volume. Defaults to the provider's " + "default volume spec (5120 MiB on nf-multi-rw — the minimum " + "Northflank accepts for that class)." + ), + ) + args = parser.parse_args() + + if not args.project_id: + raise SystemExit( + "Set NF_PROJECT_ID or pass --project-id; this example needs a Northflank project." + ) + + asyncio.run( + main( + model=args.model, + question=args.question, + project_id=args.project_id, + team_id=args.team_id, + image_path=args.image_path, + docker_command=args.docker_command, + stream=args.stream, + workspace_persistence=args.workspace_persistence, + volume_storage_size_mb=args.volume_storage_size_mb, + ) + )