diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index 42153c20d..31eaae70f 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -3,6 +3,7 @@ import subprocess import time import uuid +import json from typing import Any from aiohttp import web @@ -25,6 +26,37 @@ logger = logging.getLogger(__name__) +def _truncate(text: str, max_len: int = 500) -> str: + """Truncate text for logging.""" + if len(text) <= max_len: + return text + return text[:max_len] + f"... ({len(text) - max_len} more chars)" + + +def _format_message(msg: dict) -> str: + """Format a message for logging.""" + role = msg.get("role", "unknown") + content = msg.get("content", "") + + # Handle tool calls + tool_calls = msg.get("tool_calls", []) + if tool_calls: + tools_summary = [] + for tc in tool_calls: + func = tc.get("function", {}) + name = func.get("name", "unknown") + args = func.get("arguments", "") + tools_summary.append(f"{name}({_truncate(args, 200)})") + return f"[{role}] tool_calls: {', '.join(tools_summary)}" + + # Handle tool results + if role == "tool": + tool_call_id = msg.get("tool_call_id", "") + return f"[{role}:{tool_call_id}] {_truncate(str(content), 300)}" + + return f"[{role}] {_truncate(str(content), 500)}" + + class CliAgentEnv(vf.MultiTurnEnv): """ Environment for running full agent code inside sandboxes. @@ -55,6 +87,7 @@ def __init__( environment_vars: dict[str, str] | None = None, team_id: str | None = None, advanced_configs: AdvancedConfigs | None = None, + log_requests: bool = True, **kwargs, ): super().__init__(max_turns=max_turns, message_type="chat", **kwargs) @@ -77,12 +110,14 @@ def __init__( self.environment_vars = environment_vars self.team_id = team_id self.advanced_configs = advanced_configs + self.log_requests = log_requests self.active_rollouts: dict[str, dict[str, Any]] = {} self.intercepts: dict[str, dict[str, Any]] = {} # request_id -> intercept data self.interception_server: Any = None self._server_lock = asyncio.Lock() self._server_runner: Any = None self._server_site: Any = None + self._request_counts: dict[str, int] = {} # rollout_id -> request count def _ensure_cloudflared_installed(self) -> str: """Install cloudflared if not already installed. Returns path to cloudflared binary.""" @@ -318,12 +353,45 @@ async def get_prompt_messages(self, state: State) -> Messages: process request immediately with injected sampling_args, store response in intercept, return messages. """ + rollout_id = state.get("rollout_id", "unknown") request_id_queue = state["request_id_queue"] - request_id = await asyncio.wait_for( - request_id_queue.get(), - timeout=self.request_timeout, - ) + # Track request count for logging + if rollout_id not in self._request_counts: + self._request_counts[rollout_id] = 0 + self._request_counts[rollout_id] += 1 + req_num = self._request_counts[rollout_id] + + # Poll for requests while checking completion periodically + # This avoids blocking for the full request_timeout when agent finishes + poll_interval = 5.0 # Check completion every 5 seconds + elapsed = 0.0 + request_id = None + while elapsed < self.request_timeout: + try: + request_id = await asyncio.wait_for( + request_id_queue.get(), + timeout=poll_interval, + ) + break # Got a request, continue processing + except asyncio.TimeoutError: + elapsed += poll_interval + # Check if agent signaled completion + if await self.agent_signaled_completion(state): + logger.debug("Agent signaled completion while waiting for request") + # Set flag for early exit - stop conditions will handle termination + state["_cli_agent_completed"] = True + return [] # Return empty messages, stop condition will trigger + # Check timeout + if await self.timeout_reached(state): + logger.debug("Timeout reached while waiting for request") + state["_cli_agent_completed"] = True + return [] + + if request_id is None: + raise asyncio.TimeoutError( + f"No request received within {self.request_timeout}s" + ) intercept = self.intercepts[request_id] messages = intercept["messages"] @@ -333,6 +401,16 @@ async def get_prompt_messages(self, state: State) -> Messages: request_tools = intercept.get("tools") effective_sampling_args = state.get("sampling_args") or {} + # Log the intercepted request + if self.log_requests: + logger.info( + f"[Request #{req_num}] model={request_model}, " + f"messages={len(messages)}, tools={len(request_tools or [])}" + ) + # Log the last few messages (most relevant context) + for msg in messages[-3:]: + logger.info(f" {_format_message(msg)}") + client = state.get("client") if client is None: raise RuntimeError("Client not set in state") @@ -347,6 +425,18 @@ async def get_prompt_messages(self, state: State) -> Messages: message_type=None, ) + # Log the response + if self.log_requests and response.choices: + choice = response.choices[0] + msg = choice.message + if msg.tool_calls: + tools = [f"{tc.function.name}(...)" for tc in msg.tool_calls] + logger.info(f"[Response #{req_num}] tool_calls: {', '.join(tools)}") + elif msg.content: + logger.info(f"[Response #{req_num}] {_truncate(msg.content, 200)}") + else: + logger.info(f"[Response #{req_num}] (empty)") + intercept["response_future"].set_result(response) intercept["response"] = response state["current_request_id"] = request_id @@ -367,6 +457,29 @@ async def get_model_response( Return cached response if available (set by get_prompt_messages). Otherwise fall back to parent implementation. """ + # If prompt is empty, we're in early-exit mode - return a dummy response + # The stop condition will terminate the rollout on the next iteration + if not prompt: + from openai.types.chat import ChatCompletion, ChatCompletionMessage + from openai.types.chat.chat_completion import Choice + + return ChatCompletion( + id="cli_agent_early_exit", + choices=[ + Choice( + finish_reason="stop", + index=0, + message=ChatCompletionMessage( + role="assistant", + content="", + ), + ) + ], + created=int(time.time()), + model=model, + object="chat.completion", + ) + for request_id, intercept in list(self.intercepts.items()): rollout_id = intercept.get("rollout_id") if rollout_id and rollout_id in self.active_rollouts: @@ -427,6 +540,18 @@ async def _handle_intercepted_request(self, request: Any) -> Any: {"error": f"Invalid JSON: {e}"}, status=400 ) + # Log request details including stream parameter + stream_requested = request_body.get("stream", False) + logger.info( + f"Intercepted request: stream={stream_requested}, " + f"model={request_body.get('model')}, " + f"messages={len(request_body.get('messages', []))}" + ) + + # Force non-streaming - we don't support SSE streaming yet + # The response will be converted to SSE format if stream was requested + request_body["stream"] = False + request_id = f"req_{uuid.uuid4().hex[:8]}" intercept = { "request_id": request_id, @@ -434,6 +559,7 @@ async def _handle_intercepted_request(self, request: Any) -> Any: "messages": request_body["messages"], "model": request_body.get("model"), "tools": request_body.get("tools"), + "stream_requested": stream_requested, # Remember if client wanted streaming "response_future": asyncio.Future(), } @@ -451,8 +577,130 @@ async def _handle_intercepted_request(self, request: Any) -> Any: response_dict = ( response.model_dump() if hasattr(response, "model_dump") else dict(response) ) + + # logger.info( + # f"Response to agent: {json.dumps(response_dict, indent=2, default=str)[:2000]}" + # ) + + # If client requested streaming, convert to SSE format + if intercept.get("stream_requested", False): + return self._create_sse_response(response_dict) + return web.json_response(response_dict) # type: ignore + def _create_sse_response(self, response_dict: dict) -> web.Response: + """Convert a chat completion response to SSE streaming format.""" + response_id = response_dict.get("id", "chatcmpl-unknown") + created = response_dict.get("created", 0) + model = response_dict.get("model", "unknown") + + chunks = [] + + for choice in response_dict.get("choices", []): + message = choice.get("message", {}) + finish_reason = choice.get("finish_reason") + index = choice.get("index", 0) + + # First chunk: role + first_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": index, + "delta": {"role": message.get("role", "assistant")}, + "finish_reason": None, + } + ], + } + chunks.append(f"data: {json.dumps(first_chunk)}\n\n") + + # Content chunk (if any) + content = message.get("content") + if content: + content_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": index, + "delta": {"content": content}, + "finish_reason": None, + } + ], + } + chunks.append(f"data: {json.dumps(content_chunk)}\n\n") + + # Tool calls chunks (if any) + tool_calls = message.get("tool_calls", []) + if tool_calls: + # Send tool call with full info in one chunk + tc_delta = [] + for i, tc in enumerate(tool_calls): + tc_delta.append( + { + "index": i, + "id": tc.get("id"), + "type": tc.get("type", "function"), + "function": { + "name": tc.get("function", {}).get("name", ""), + "arguments": tc.get("function", {}).get( + "arguments", "" + ), + }, + } + ) + + tool_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": index, + "delta": {"tool_calls": tc_delta}, + "finish_reason": None, + } + ], + } + chunks.append(f"data: {json.dumps(tool_chunk)}\n\n") + + # Final chunk with finish_reason + final_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": index, + "delta": {}, + "finish_reason": finish_reason, + } + ], + } + chunks.append(f"data: {json.dumps(final_chunk)}\n\n") + + # End of stream + chunks.append("data: [DONE]\n\n") + + body = "".join(chunks) + logger.debug(f"SSE response body:\n{body[:1500]}") + return web.Response( + body=body, + status=200, + content_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + @vf.teardown async def teardown_tunnel(self): """Stop all cloudflared tunnel processes""" @@ -482,6 +730,10 @@ async def cleanup_interception_context(self, state: State): del self.intercepts[request_id] del self.active_rollouts[rollout_id] + # Clean up request count + if rollout_id and rollout_id in self._request_counts: + del self._request_counts[rollout_id] + # Decrement active rollouts for the tunnel used by this rollout tunnel_url = state.get("tunnel_url") if tunnel_url: @@ -493,6 +745,11 @@ async def cleanup_interception_context(self, state: State): ) break + @vf.stop + async def early_exit_flag_set(self, state: State) -> bool: + """Check if early exit flag was set (by completion detection in get_prompt_messages)""" + return state.get("_cli_agent_completed", False) + @vf.stop async def agent_signaled_completion(self, state: State) -> bool: """Check for /tmp/vf_complete marker file""" diff --git a/verifiers/envs/harbor_cli_agent_env.py b/verifiers/envs/harbor_cli_agent_env.py new file mode 100644 index 000000000..612dc0203 --- /dev/null +++ b/verifiers/envs/harbor_cli_agent_env.py @@ -0,0 +1,316 @@ +import asyncio +import json +import logging +import tarfile +import tempfile +import uuid +from pathlib import Path +from typing import Any + +import tomli +from datasets import Dataset +from prime_sandboxes import ( + AsyncSandboxClient, + CreateSandboxRequest, +) + +import verifiers as vf +from verifiers.envs.cli_agent_env import CliAgentEnv + +logger = logging.getLogger(__name__) + + +class HarborCliAgentEnv(CliAgentEnv): + """CliAgentEnv subclass that loads Harbor-format tasks.""" + + def __init__( + self, + dataset_path: str | Path, + tasks: list[str] | None = None, + agent_workdir: str = "/app", + default_docker_image: str | None = None, + **kwargs, + ): + self.dataset_path = Path(dataset_path) + self.task_names = tasks + self.agent_workdir = agent_workdir + self.default_docker_image = default_docker_image + + if default_docker_image and "docker_image" not in kwargs: + kwargs["docker_image"] = default_docker_image + + dataset = self._load_harbor_dataset() + rubric = vf.Rubric(funcs=[self.harbor_reward], weights=[1.0]) + + super().__init__(dataset=dataset, rubric=rubric, **kwargs) + + def _load_harbor_dataset(self) -> Dataset: + """Load Harbor tasks from dataset directory into a Dataset with prompts.""" + if not self.dataset_path.exists(): + raise FileNotFoundError(f"Dataset path not found: {self.dataset_path}") + + tasks = [] + for task_dir in sorted(self.dataset_path.iterdir()): + if not task_dir.is_dir(): + continue + + if self.task_names and task_dir.name not in self.task_names: + continue + + task_toml = task_dir / "task.toml" + instruction_md = task_dir / "instruction.md" + + if not task_toml.exists() or not instruction_md.exists(): + logger.warning( + f"Skipping {task_dir.name}: missing task.toml or instruction.md" + ) + continue + + with open(task_toml, "rb") as f: + config = tomli.load(f) + + instruction = instruction_md.read_text().strip() + + docker_image = ( + config.get("environment", {}).get("docker_image") + or self.default_docker_image + ) + if not docker_image: + logger.warning( + f"Skipping {task_dir.name}: no environment.docker_image in task.toml " + "and no default_docker_image provided. " + "Run harbor_build.py first to build/push images." + ) + continue + + # TODO: remove this prompt + messages = [ + { + "role": "system", + "content": ( + "You are an autonomous agent inside a Harbor sandbox. " + f"Work in {self.agent_workdir}, follow the user instruction, write /tmp/vf_complete " + "when finished, and do not tamper with /tests or /oracle." + ), + }, + {"role": "user", "content": instruction}, + ] + + task_entry = { + "example_id": len(tasks), + "task": task_dir.name, + "prompt": messages, + "info": { + "task_dir": str(task_dir), + "docker_image": docker_image, + "config": config, + }, + } + + tasks.append(task_entry) + + if not tasks: + raise ValueError(f"No valid Harbor tasks found in {self.dataset_path}") + + logger.info(f"Loaded {len(tasks)} Harbor tasks from {self.dataset_path}") + return Dataset.from_list(tasks) + + async def setup_state(self, state: vf.State) -> vf.State: + """Create sandbox per rollout with Harbor assets uploaded.""" + # Skip CliAgentEnv.setup_state (needs per-task docker image); call MultiTurnEnv.setup_state + state = await super(CliAgentEnv, self).setup_state(state) # type: ignore[misc] + + task_info: dict[str, Any] = state.get("info", {}) or {} + task_dir = Path(task_info.get("task_dir", "")) + config = task_info.get("config", {}) + docker_image = task_info.get("docker_image") or self.docker_image + + if not task_dir.exists(): + raise FileNotFoundError(f"Task directory not found: {task_dir}") + + rollout_id = f"rollout_{uuid.uuid4().hex[:8]}" + state["rollout_id"] = rollout_id + + await self._ensure_interception_server() + + tunnel_url: str | None = None + if self.interception_host is None: + tunnel_url = await self._get_tunnel_url() + state["interception_base_url"] = f"{tunnel_url}/rollout/{rollout_id}/v1" + else: + state["interception_base_url"] = ( + f"http://{self.interception_host}:{self.interception_port}/rollout/{rollout_id}/v1" + ) + + env_vars = dict(self.environment_vars) if self.environment_vars else {} + env_vars["OPENAI_BASE_URL"] = state["interception_base_url"] + model = state.get("model") + if model: + env_vars["OPENAI_MODEL"] = model + env_vars.setdefault("HARBOR_TASK_NAME", state.get("task", "")) + env_vars.setdefault("HARBOR_TASK_DIR", "/task") + env_vars.setdefault("HARBOR_INSTRUCTION_PATH", "/task/instruction.md") + if self.agent_workdir: + env_vars.setdefault("AGENT_WORKDIR", self.agent_workdir) + + sandbox_client = AsyncSandboxClient() + sandbox_request = CreateSandboxRequest( + name=f"harbor-cli-agent-{rollout_id}", + docker_image=docker_image, + start_command=self.start_command, + cpu_cores=self.cpu_cores, + memory_gb=self.memory_gb, + disk_size_gb=self.disk_size_gb, + gpu_count=self.gpu_count, + timeout_minutes=self.timeout_minutes, + environment_vars=env_vars, + team_id=self.team_id, + advanced_configs=self.advanced_configs, + ) + + logger.debug( + f"Creating sandbox with OPENAI_BASE_URL={env_vars.get('OPENAI_BASE_URL')} " + f"docker_image={docker_image}" + ) + sandbox = await sandbox_client.create(sandbox_request) + state["sandbox_id"] = sandbox.id + await sandbox_client.wait_for_creation(sandbox.id) + + await self._prepare_harbor_task(sandbox_client, sandbox.id, task_dir) + state["harbor_config"] = config + state["harbor_task_dir"] = str(task_dir) + + request_id_queue: asyncio.Queue[str] = asyncio.Queue() + state["request_id_queue"] = request_id_queue + state["current_request_id"] = None + state["tunnel_url"] = tunnel_url if self.interception_host is None else None + self.active_rollouts[rollout_id] = { + "request_id_queue": request_id_queue, + "current_request_id": None, + } + + return state + + async def _prepare_harbor_task( + self, sandbox_client: AsyncSandboxClient, sandbox_id: str, task_dir: Path + ) -> None: + """Upload solution/tests and make log directory.""" + solution_dir = task_dir / "solution" + tests_dir = task_dir / "tests" + instruction_path = task_dir / "instruction.md" + task_toml_path = task_dir / "task.toml" + + if solution_dir.exists(): + await self._upload_directory( + sandbox_client, sandbox_id, solution_dir, "/oracle" + ) + logger.debug(f"Uploaded solution for {task_dir.name} to /oracle") + + if tests_dir.exists(): + await self._upload_directory( + sandbox_client, sandbox_id, tests_dir, "/tests" + ) + logger.debug(f"Uploaded tests for {task_dir.name} to /tests") + + mkdir_cmd = f"mkdir -p /logs/verifier /task {self.agent_workdir}" + await sandbox_client.execute_command(sandbox_id, mkdir_cmd, working_dir=None) + + if instruction_path.exists(): + await sandbox_client.upload_file( + sandbox_id, "/task/instruction.md", str(instruction_path) + ) + if task_toml_path.exists(): + await sandbox_client.upload_file( + sandbox_id, "/task/task.toml", str(task_toml_path) + ) + + async def _upload_directory( + self, + sandbox_client: AsyncSandboxClient, + sandbox_id: str, + local_dir: Path, + remote_path: str, + ) -> None: + """Tar + upload a directory into the sandbox.""" + with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: + tar_path = Path(tmp_file.name) + + try: + with tarfile.open(tar_path, "w:gz") as tar: + for item in local_dir.iterdir(): + tar.add(item, arcname=item.name) + + remote_tar = f"/tmp/upload_{local_dir.name}.tar.gz" + await sandbox_client.upload_file(sandbox_id, remote_tar, str(tar_path)) + await sandbox_client.execute_command( + sandbox_id, + f"mkdir -p {remote_path} && tar -xzf {remote_tar} -C {remote_path} && rm {remote_tar}", + working_dir=None, + ) + finally: + tar_path.unlink(missing_ok=True) + + async def post_rollout(self, state: vf.State): + """Run Harbor tests to compute reward before sandbox destruction.""" + reward = await self._compute_reward(state) + state["harbor_reward_value"] = reward + state["reward"] = reward + + async def harbor_reward(self, state: vf.State, **kwargs) -> float: + return state.get("harbor_reward_value", 0.0) + + async def _compute_reward(self, state: vf.State) -> float: + """ + Execute Harbor tests (tests/test.sh) inside the sandbox to compute reward. + Prioritizes /logs/verifier/reward.txt, falling back to reward.json. + """ + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + logger.error("No sandbox_id in state") + return 0.0 + + sandbox_client = AsyncSandboxClient() + try: + logger.info(f"Running Harbor tests for task {state.get('task')}") + await sandbox_client.execute_command( + sandbox_id, "bash test.sh", working_dir="/tests" + ) + + reward_txt = await sandbox_client.execute_command( + sandbox_id, + "cat /logs/verifier/reward.txt 2>/dev/null || echo ''", + working_dir=None, + ) + reward_txt_val = self._stdout_text(reward_txt) + if reward_txt_val: + value = float(reward_txt_val) + logger.info(f"Reward from reward.txt: {value}") + return value + + reward_json = await sandbox_client.execute_command( + sandbox_id, + "cat /logs/verifier/reward.json 2>/dev/null || echo ''", + working_dir=None, + ) + reward_json_val = self._stdout_text(reward_json) + if reward_json_val: + data = json.loads(reward_json_val) + value = float(data.get("reward", 0.0)) + logger.info(f"Reward from reward.json: {value}") + return value + + logger.warning("No reward.txt or reward.json produced by Harbor tests") + return 0.0 + except Exception as e: + logger.error(f"Error computing Harbor reward: {e}") + return 0.0 + + @staticmethod + def _stdout_text(result: Any) -> str: + """Extract trimmed stdout from a Sandbox command result.""" + stdout_val = getattr(result, "stdout", "") + if stdout_val is None: + return "" + if isinstance(stdout_val, str): + return stdout_val.strip() + return str(stdout_val).strip() diff --git a/verifiers/envs/harbor_env.py b/verifiers/envs/harbor_env.py new file mode 100644 index 000000000..0e64e1b1d --- /dev/null +++ b/verifiers/envs/harbor_env.py @@ -0,0 +1,275 @@ +""" +Supports Harbor task format with: +- instruction.md: Task instruction +- task.toml: Task configuration +- environment/: Environment definition (Dockerfile) +- solution/: Solution scripts +- tests/: Test scripts (outputs reward.txt or reward.json) +""" + +import json +import logging +import tarfile +import tempfile +from pathlib import Path + +import tomli +from datasets import Dataset + +import verifiers as vf +from verifiers.envs.sandbox_env import SandboxEnv + +logger = logging.getLogger(__name__) + + +class HarborEnv(SandboxEnv): + """ + Environment for Harbor-format tasks using Prime sandboxes. + + Each Harbor task is loaded as a separate example in the dataset. + The agent is given the task instruction and has access to a bash tool + to complete the task. After the agent finishes, tests are run to verify + completion and compute the reward. + """ + + def __init__( + self, + dataset_path: Path | str | None = None, + tasks: list[str] | None = None, + timeout_minutes: int = 60, + cpu_cores: int = 2, + memory_gb: int = 4, + disk_size_gb: int = 10, + **kwargs, + ): + self.dataset_path = Path(dataset_path) if dataset_path else None + self.task_names = tasks + self.timeout_minutes = timeout_minutes + self.cpu_cores = cpu_cores + self.memory_gb = memory_gb + self.disk_size_gb = disk_size_gb + + dataset = self._load_harbor_dataset() + rubric = vf.Rubric(funcs=[self.harbor_reward], weights=[1.0]) + + super().__init__( + dataset=dataset, + rubric=rubric, + timeout_minutes=timeout_minutes, + cpu_cores=cpu_cores, + memory_gb=memory_gb, + disk_size_gb=disk_size_gb, + **kwargs, + ) + + def _load_harbor_dataset(self) -> Dataset: + """Load Harbor tasks from dataset directory into HuggingFace Dataset.""" + if self.dataset_path is None: + raise ValueError("dataset_path must be provided") + + if not self.dataset_path.exists(): + raise FileNotFoundError(f"Dataset path not found: {self.dataset_path}") + + tasks = [] + task_dirs = sorted(self.dataset_path.iterdir()) + + for task_dir in task_dirs: + if not task_dir.is_dir(): + continue + + if self.task_names and task_dir.name not in self.task_names: + continue + + task_toml = task_dir / "task.toml" + instruction_md = task_dir / "instruction.md" + + if not task_toml.exists() or not instruction_md.exists(): + logger.warning( + f"Skipping {task_dir.name}: missing task.toml or instruction.md" + ) + continue + + with open(task_toml, "rb") as f: + config = tomli.load(f) + + instruction = instruction_md.read_text().strip() + + docker_image = config.get("environment", {}).get("docker_image") + if not docker_image: + logger.warning( + f"Skipping {task_dir.name}: no docker_image in task.toml. " + f"Run harbor_build.py first to build and push images." + ) + continue + + task_entry = { + "example_id": len(tasks), + "task": task_dir.name, + "question": instruction, + "answer": "", # Harbor tasks don't have reference answers + "info": { + "task_dir": str(task_dir), + "docker_image": docker_image, + "config": config, + }, + } + + tasks.append(task_entry) + + if not tasks: + raise ValueError(f"No valid Harbor tasks found in {self.dataset_path}") + + logger.info(f"Loaded {len(tasks)} Harbor tasks from {self.dataset_path}") + + return Dataset.from_list(tasks) + + async def setup_state(self, state: vf.State, **kwargs) -> vf.State: + """ + Creates the sandbox, uploads solution and test files, + and prepares the environment for the agent. + """ + task_info = state.get("info", {}) + task_dir = Path(task_info.get("task_dir", "")) + docker_image = task_info.get("docker_image") + config = task_info.get("config", {}) + + if not task_dir.exists(): + raise FileNotFoundError(f"Task directory not found: {task_dir}") + + if docker_image: + self.sandbox_request.docker_image = docker_image + + state = await super().setup_state(state, **kwargs) + + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + raise RuntimeError("Sandbox not created in parent setup_state") + + await self.sandbox_client.wait_for_creation(sandbox_id) + + # Upload solution folder to /oracle + solution_dir = task_dir / "solution" + if solution_dir.exists(): + await self._upload_directory(sandbox_id, solution_dir, "/oracle") + logger.debug(f"Uploaded solution to /oracle in sandbox {sandbox_id}") + + # Upload tests folder to /tests + tests_dir = task_dir / "tests" + if tests_dir.exists(): + await self._upload_directory(sandbox_id, tests_dir, "/tests") + logger.debug(f"Uploaded tests to /tests in sandbox {sandbox_id}") + + # Create /logs/verifier directory for test outputs (run from root) + await self.bash("mkdir -p /logs/verifier", sandbox_id, working_dir=None) + + # Store task config in state for reward function + state["harbor_config"] = config + state["harbor_task_dir"] = str(task_dir) + + return state + + def update_tool_args( + self, + tool_name: str, + tool_args: dict, + messages: vf.Messages, + state: vf.State, + **kwargs, + ) -> dict: + """Inject working_dir=/app for agent bash calls.""" + updated_args = super().update_tool_args( + tool_name, tool_args, messages, state, **kwargs + ) + + # For bash commands, inject working_dir=/app for agent calls + if tool_name == "bash" and "working_dir" not in updated_args: + updated_args["working_dir"] = "/app" + + return updated_args + + async def _upload_directory( + self, sandbox_id: str, local_dir: Path, remote_path: str + ): + """Upload a directory to the sandbox using tar.""" + with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: + tar_path = Path(tmp_file.name) + + try: + with tarfile.open(tar_path, "w:gz") as tar: + for item in local_dir.iterdir(): + tar.add(item, arcname=item.name) + + # Upload tar to sandbox (upload_file params are sandbox_id, remote_path, local_path) + remote_tar = f"/tmp/upload_{local_dir.name}.tar.gz" + await self.sandbox_client.upload_file(sandbox_id, remote_tar, str(tar_path)) + + # Extract in sandbox (run from root to access /tmp and create directories) + await self.bash( + f"mkdir -p {remote_path} && tar -xzf {remote_tar} -C {remote_path} && rm {remote_tar}", + sandbox_id, + working_dir=None, + ) + finally: + tar_path.unlink(missing_ok=True) + + async def post_rollout(self, state: vf.State): + reward = await self._compute_reward(state) + state["harbor_reward_value"] = reward + + async def harbor_reward(self, state: vf.State, **kwargs) -> float: + return state.get("harbor_reward_value", 0.0) + + async def _compute_reward(self, state: vf.State) -> float: + """ + Compute reward by running Harbor tests. + + Harbor tests should output either: + - /logs/verifier/reward.txt: Single number (0 or 1 typically) + - /logs/verifier/reward.json: JSON with "reward" field + + Returns: + float: Reward value (typically 0 or 1) + """ + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + logger.error("No sandbox_id in state") + return 0.0 + + try: + logger.info(f"Running tests for task {state.get('task')}") + result = await self.bash("bash test.sh", sandbox_id, working_dir="/tests") + logger.debug(f"Test script output: {result}") + + # Try to read reward.txt first + reward_txt_result = await self.bash( + "cat /logs/verifier/reward.txt 2>/dev/null || echo ''", + sandbox_id, + working_dir=None, + ) + + if reward_txt_result and reward_txt_result.strip(): + reward_value = float(reward_txt_result.strip()) + logger.info(f"Reward from reward.txt: {reward_value}") + return reward_value + + # Fall back to reward.json + reward_json_result = await self.bash( + "cat /logs/verifier/reward.json 2>/dev/null || echo ''", + sandbox_id, + working_dir=None, + ) + + if reward_json_result and reward_json_result.strip(): + reward_data = json.loads(reward_json_result) + reward_value = float(reward_data.get("reward", 0.0)) + logger.info(f"Reward from reward.json: {reward_value}") + return reward_value + + logger.warning( + f"No reward.txt or reward.json found for task {state.get('task')}" + ) + return 0.0 + + except Exception as e: + logger.error(f"Error computing reward: {e}") + return 0.0 diff --git a/verifiers/envs/sandbox_env.py b/verifiers/envs/sandbox_env.py index c21125f98..deb1f850a 100644 --- a/verifiers/envs/sandbox_env.py +++ b/verifiers/envs/sandbox_env.py @@ -49,9 +49,11 @@ def __init__( advanced_configs=advanced_configs, ) self.active_sandboxes = set() - self.add_tool(self.bash, args_to_skip=["sandbox_id"]) + self.add_tool(self.bash, args_to_skip=["sandbox_id", "working_dir"]) - async def bash(self, command: str, sandbox_id: str) -> str: + async def bash( + self, command: str, sandbox_id: str, working_dir: str | None = None + ) -> str: """Execute `command` inside persistent sandbox container.""" # sandbox_id is passed via update_tool_args, not seen by model s = time.time() @@ -60,8 +62,15 @@ async def bash(self, command: str, sandbox_id: str) -> str: ) # wait for sandbox to be created self.logger.debug(f"Waited {time.time() - s:.1f}s for sandbox to be ready") s = time.time() - self.logger.debug(f"Executing command {command} in sandbox {sandbox_id}") - results = await self.sandbox_client.execute_command(sandbox_id, command) + # Execute command with optional working directory + if working_dir: + self.logger.debug(f"Executing command in {working_dir}: {command}") + results = await self.sandbox_client.execute_command( + sandbox_id, command, working_dir=working_dir + ) + else: + self.logger.debug(f"Executing command {command} in sandbox {sandbox_id}") + results = await self.sandbox_client.execute_command(sandbox_id, command) e = time.time() stdout = results.stdout.strip() stderr = (results.stderr or "").strip() @@ -128,7 +137,7 @@ async def bulk_delete_sandboxes(self, global_ids: list[str]) -> None: except Exception as e: self.logger.error(f"Failed to bulk delete sandboxes {global_ids}: {e}") - @vf.teardown # type: ignore + @vf.teardown # type: ignore async def teardown_sandboxes(self): """Delete all active sandboxes""" if len(self.active_sandboxes) == 0: