From 10ba66556e0844cdae7e22daea995297f88b6016 Mon Sep 17 00:00:00 2001 From: Dominik Date: Sat, 8 Nov 2025 22:11:19 +0100 Subject: [PATCH 01/22] Bump prime sandbox version. (#546) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4f05c9475..90afe1ed1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,7 @@ dependencies = [ "textual", "pydantic>=2.11.9", "tomli; python_version < '3.11'", - "prime-sandboxes>=0.1.0", + "prime-sandboxes>=0.2.2", "wget>=3.2", ] From 25932ab7fc43a705b0a69310e5834ab2a25fc3c6 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Wed, 12 Nov 2025 07:42:33 +0100 Subject: [PATCH 02/22] Serialize OAI responses before saving (#554) --- verifiers/utils/eval_utils.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/verifiers/utils/eval_utils.py b/verifiers/utils/eval_utils.py index 3c2c18d38..66bfc8715 100644 --- a/verifiers/utils/eval_utils.py +++ b/verifiers/utils/eval_utils.py @@ -189,7 +189,12 @@ def make_dataset(results: GenerateOutputs, **kwargs) -> Dataset: state_columns = results.metadata.state_columns if state_columns: for col in state_columns: - results_dict[col] = [s.get(col) for s in results.state] + if col == "responses": + results_dict[col] = [ + [r.model_dump() for r in s.get(col, [])] for s in results.state + ] + else: + results_dict[col] = [s.get(col) for s in results.state] return Dataset.from_dict(results_dict) From 3f20f0f99e8fd648ff9aefdba1b39ff0bd1602ed Mon Sep 17 00:00:00 2001 From: William Brown Date: Thu, 13 Nov 2025 18:29:50 -0800 Subject: [PATCH 03/22] pre-install libs for pythonenv --- verifiers/envs/python_env.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/verifiers/envs/python_env.py b/verifiers/envs/python_env.py index 06479e445..f318ce03f 100644 --- a/verifiers/envs/python_env.py +++ b/verifiers/envs/python_env.py @@ -111,6 +111,8 @@ def ensure_fifo(path: str) -> None: rm -f "$command_fifo" "$response_fifo" "$ready_flag" + pip install -q numpy sympy scipy + python - <<'PY' import base64 from pathlib import Path From 0b5ad34449651113afe7ad396838c7677ab7a1ae Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Tue, 18 Nov 2025 18:20:59 -0800 Subject: [PATCH 04/22] harbor env --- verifiers/envs/harbor_env.py | 267 +++++++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 verifiers/envs/harbor_env.py diff --git a/verifiers/envs/harbor_env.py b/verifiers/envs/harbor_env.py new file mode 100644 index 000000000..8cc1de3e8 --- /dev/null +++ b/verifiers/envs/harbor_env.py @@ -0,0 +1,267 @@ +""" +Supports Harbor task format with: +- instruction.md: Task instruction +- task.toml: Task configuration +- environment/: Environment definition (Dockerfile) +- solution/: Solution scripts +- tests/: Test scripts (outputs reward.txt or reward.json) +""" + +import json +import logging +import tarfile +import tempfile +from pathlib import Path + +import tomli +from datasets import Dataset + +import verifiers as vf +from verifiers.envs.sandbox_env import SandboxEnv + +logger = logging.getLogger(__name__) + + +class HarborEnv(SandboxEnv): + """ + Environment for Harbor-format tasks using Prime sandboxes. + + Each Harbor task is loaded as a separate example in the dataset. + The agent is given the task instruction and has access to a bash tool + to complete the task. After the agent finishes, tests are run to verify + completion and compute the reward. + """ + + def __init__( + self, + dataset_path: Path | str | None = None, + tasks: list[str] | None = None, + timeout_minutes: int = 60, + cpu_cores: int = 2, + memory_gb: int = 4, + disk_size_gb: int = 10, + **kwargs, + ): + self.dataset_path = Path(dataset_path) if dataset_path else None + self.task_names = tasks + self.timeout_minutes = timeout_minutes + self.cpu_cores = cpu_cores + self.memory_gb = memory_gb + self.disk_size_gb = disk_size_gb + + dataset = self._load_harbor_dataset() + rubric = vf.Rubric(funcs=[self.harbor_reward], weights=[1.0]) + + super().__init__( + dataset=dataset, + rubric=rubric, + timeout_minutes=timeout_minutes, + cpu_cores=cpu_cores, + memory_gb=memory_gb, + disk_size_gb=disk_size_gb, + **kwargs, + ) + + def _load_harbor_dataset(self) -> Dataset: + """Load Harbor tasks from dataset directory into HuggingFace Dataset.""" + if self.dataset_path is None: + raise ValueError("dataset_path must be provided") + + if not self.dataset_path.exists(): + raise FileNotFoundError(f"Dataset path not found: {self.dataset_path}") + + tasks = [] + task_dirs = sorted(self.dataset_path.iterdir()) + + for task_dir in task_dirs: + if not task_dir.is_dir(): + continue + + # Filter by task names if specified + if self.task_names and task_dir.name not in self.task_names: + continue + + # Check for required files + task_toml = task_dir / "task.toml" + instruction_md = task_dir / "instruction.md" + + if not task_toml.exists() or not instruction_md.exists(): + logger.warning( + f"Skipping {task_dir.name}: missing task.toml or instruction.md" + ) + continue + + # Load task config + with open(task_toml, "rb") as f: + config = tomli.load(f) + + # Load instruction + instruction = instruction_md.read_text().strip() + + # Check for docker_image in config + docker_image = config.get("environment", {}).get("docker_image") + if not docker_image: + logger.warning( + f"Skipping {task_dir.name}: no docker_image in task.toml. " + f"Run harbor_build.py first to build and push images." + ) + continue + + # Create task entry + task_entry = { + "example_id": len(tasks), + "task": task_dir.name, + "question": instruction, + "answer": "", # Harbor tasks don't have reference answers + "info": { + "task_dir": str(task_dir), + "docker_image": docker_image, + "config": config, + }, + } + + tasks.append(task_entry) + + if not tasks: + raise ValueError(f"No valid Harbor tasks found in {self.dataset_path}") + + logger.info(f"Loaded {len(tasks)} Harbor tasks from {self.dataset_path}") + + # Convert to HuggingFace Dataset + return Dataset.from_list(tasks) + + async def setup_state(self, state: vf.State, **kwargs) -> vf.State: + """ + Creates the sandbox, uploads solution and test files, + and prepares the environment for the agent. + """ + # Get task info and update sandbox request BEFORE creating sandbox + task_info = state.get("info", {}) + task_dir = Path(task_info.get("task_dir", "")) + docker_image = task_info.get("docker_image") + config = task_info.get("config", {}) + + if not task_dir.exists(): + raise FileNotFoundError(f"Task directory not found: {task_dir}") + + # Update sandbox request with task-specific docker image + if docker_image: + self.sandbox_request.docker_image = docker_image + + # Call parent setup to create sandbox (this will use updated docker_image) + state = await super().setup_state(state, **kwargs) + + # Get sandbox_id + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + raise RuntimeError("Sandbox not created in parent setup_state") + + # Wait for sandbox to be ready before uploading files + await self.sandbox_client.wait_for_creation(sandbox_id) + + # Upload solution folder to /oracle + solution_dir = task_dir / "solution" + if solution_dir.exists(): + await self._upload_directory(sandbox_id, solution_dir, "/oracle") + logger.debug(f"Uploaded solution to /oracle in sandbox {sandbox_id}") + + # Upload tests folder to /tests + tests_dir = task_dir / "tests" + if tests_dir.exists(): + await self._upload_directory(sandbox_id, tests_dir, "/tests") + logger.debug(f"Uploaded tests to /tests in sandbox {sandbox_id}") + + # Create /logs/verifier directory for test outputs + await self.bash("mkdir -p /logs/verifier", sandbox_id) + + # Store task config in state for reward function + state["harbor_config"] = config + state["harbor_task_dir"] = str(task_dir) + + return state + + async def _upload_directory( + self, sandbox_id: str, local_dir: Path, remote_path: str + ): + """Upload a directory to the sandbox using tar.""" + # Create tar file + with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: + tar_path = Path(tmp_file.name) + + try: + # Create tar archive + with tarfile.open(tar_path, "w:gz") as tar: + for item in local_dir.iterdir(): + tar.add(item, arcname=item.name) + + # Upload tar to sandbox (note: upload_file params are sandbox_id, remote_path, local_path) + remote_tar = f"/tmp/upload_{local_dir.name}.tar.gz" + await self.sandbox_client.upload_file(sandbox_id, remote_tar, str(tar_path)) + + # Extract in sandbox + await self.bash( + f"mkdir -p {remote_path} && tar -xzf {remote_tar} -C {remote_path} && rm {remote_tar}", + sandbox_id, + ) + finally: + tar_path.unlink(missing_ok=True) + + async def post_rollout(self, state: vf.State): + reward = await self._compute_reward(state) + state["harbor_reward_value"] = reward + + async def harbor_reward(self, state: vf.State, **kwargs) -> float: + return state.get("harbor_reward_value", 0.0) + + async def _compute_reward(self, state: vf.State) -> float: + """ + Compute reward by running Harbor tests. + + Harbor tests should output either: + - /logs/verifier/reward.txt: Single number (0 or 1 typically) + - /logs/verifier/reward.json: JSON with "reward" field + + Returns: + float: Reward value (typically 0 or 1) + """ + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + logger.error("No sandbox_id in state") + return 0.0 + + try: + # Run test script + logger.info(f"Running tests for task {state.get('task')}") + result = await self.bash("cd /tests && bash test.sh", sandbox_id) + logger.debug(f"Test script output: {result}") + + # Try to read reward.txt first + reward_txt_result = await self.bash( + "cat /logs/verifier/reward.txt 2>/dev/null || echo ''", sandbox_id + ) + + if reward_txt_result and reward_txt_result.strip(): + reward_value = float(reward_txt_result.strip()) + logger.info(f"Reward from reward.txt: {reward_value}") + return reward_value + + # Fall back to reward.json + reward_json_result = await self.bash( + "cat /logs/verifier/reward.json 2>/dev/null || echo ''", sandbox_id + ) + + if reward_json_result and reward_json_result.strip(): + reward_data = json.loads(reward_json_result) + reward_value = float(reward_data.get("reward", 0.0)) + logger.info(f"Reward from reward.json: {reward_value}") + return reward_value + + # No reward file found + logger.warning( + f"No reward.txt or reward.json found for task {state.get('task')}" + ) + return 0.0 + + except Exception as e: + logger.error(f"Error computing reward: {e}") + return 0.0 From 61d492ef7f9cff4e47f6e86d8a9e296d3aa5ef6d Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Tue, 18 Nov 2025 22:26:00 -0800 Subject: [PATCH 05/22] clean up and add working_dir --- verifiers/envs/harbor_env.py | 23 +++++------------------ verifiers/envs/sandbox_env.py | 13 +++++++++++-- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/verifiers/envs/harbor_env.py b/verifiers/envs/harbor_env.py index 8cc1de3e8..5d7534055 100644 --- a/verifiers/envs/harbor_env.py +++ b/verifiers/envs/harbor_env.py @@ -40,6 +40,7 @@ def __init__( cpu_cores: int = 2, memory_gb: int = 4, disk_size_gb: int = 10, + working_dir: str | None = "/app", **kwargs, ): self.dataset_path = Path(dataset_path) if dataset_path else None @@ -48,6 +49,9 @@ def __init__( self.cpu_cores = cpu_cores self.memory_gb = memory_gb self.disk_size_gb = disk_size_gb + # Set working_dir to /app by default (matches typical Dockerfile WORKDIR) + # This prevents agents from accidentally modifying /oracle or /tests + self.working_dir = working_dir dataset = self._load_harbor_dataset() rubric = vf.Rubric(funcs=[self.harbor_reward], weights=[1.0]) @@ -77,11 +81,9 @@ def _load_harbor_dataset(self) -> Dataset: if not task_dir.is_dir(): continue - # Filter by task names if specified if self.task_names and task_dir.name not in self.task_names: continue - # Check for required files task_toml = task_dir / "task.toml" instruction_md = task_dir / "instruction.md" @@ -91,14 +93,11 @@ def _load_harbor_dataset(self) -> Dataset: ) continue - # Load task config with open(task_toml, "rb") as f: config = tomli.load(f) - # Load instruction instruction = instruction_md.read_text().strip() - # Check for docker_image in config docker_image = config.get("environment", {}).get("docker_image") if not docker_image: logger.warning( @@ -107,7 +106,6 @@ def _load_harbor_dataset(self) -> Dataset: ) continue - # Create task entry task_entry = { "example_id": len(tasks), "task": task_dir.name, @@ -127,7 +125,6 @@ def _load_harbor_dataset(self) -> Dataset: logger.info(f"Loaded {len(tasks)} Harbor tasks from {self.dataset_path}") - # Convert to HuggingFace Dataset return Dataset.from_list(tasks) async def setup_state(self, state: vf.State, **kwargs) -> vf.State: @@ -135,7 +132,6 @@ async def setup_state(self, state: vf.State, **kwargs) -> vf.State: Creates the sandbox, uploads solution and test files, and prepares the environment for the agent. """ - # Get task info and update sandbox request BEFORE creating sandbox task_info = state.get("info", {}) task_dir = Path(task_info.get("task_dir", "")) docker_image = task_info.get("docker_image") @@ -144,19 +140,15 @@ async def setup_state(self, state: vf.State, **kwargs) -> vf.State: if not task_dir.exists(): raise FileNotFoundError(f"Task directory not found: {task_dir}") - # Update sandbox request with task-specific docker image if docker_image: self.sandbox_request.docker_image = docker_image - # Call parent setup to create sandbox (this will use updated docker_image) state = await super().setup_state(state, **kwargs) - # Get sandbox_id sandbox_id = state.get("sandbox_id") if not sandbox_id: raise RuntimeError("Sandbox not created in parent setup_state") - # Wait for sandbox to be ready before uploading files await self.sandbox_client.wait_for_creation(sandbox_id) # Upload solution folder to /oracle @@ -184,21 +176,18 @@ async def _upload_directory( self, sandbox_id: str, local_dir: Path, remote_path: str ): """Upload a directory to the sandbox using tar.""" - # Create tar file with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: tar_path = Path(tmp_file.name) try: - # Create tar archive with tarfile.open(tar_path, "w:gz") as tar: for item in local_dir.iterdir(): tar.add(item, arcname=item.name) - # Upload tar to sandbox (note: upload_file params are sandbox_id, remote_path, local_path) + # Upload tar to sandbox (upload_file params are sandbox_id, remote_path, local_path) remote_tar = f"/tmp/upload_{local_dir.name}.tar.gz" await self.sandbox_client.upload_file(sandbox_id, remote_tar, str(tar_path)) - # Extract in sandbox await self.bash( f"mkdir -p {remote_path} && tar -xzf {remote_tar} -C {remote_path} && rm {remote_tar}", sandbox_id, @@ -230,7 +219,6 @@ async def _compute_reward(self, state: vf.State) -> float: return 0.0 try: - # Run test script logger.info(f"Running tests for task {state.get('task')}") result = await self.bash("cd /tests && bash test.sh", sandbox_id) logger.debug(f"Test script output: {result}") @@ -256,7 +244,6 @@ async def _compute_reward(self, state: vf.State) -> float: logger.info(f"Reward from reward.json: {reward_value}") return reward_value - # No reward file found logger.warning( f"No reward.txt or reward.json found for task {state.get('task')}" ) diff --git a/verifiers/envs/sandbox_env.py b/verifiers/envs/sandbox_env.py index a034df2cc..7e61566ab 100644 --- a/verifiers/envs/sandbox_env.py +++ b/verifiers/envs/sandbox_env.py @@ -33,6 +33,7 @@ def __init__( environment_vars: dict[str, str] | None = None, team_id: str | None = None, advanced_configs: AdvancedConfigs | None = None, + working_dir: str | None = None, **kwargs, ): super().__init__(**kwargs) @@ -50,6 +51,7 @@ def __init__( team_id=team_id, advanced_configs=advanced_configs, ) + self.working_dir = None # Default working directory for bash commands self.active_sandboxes = set() # Install handlers for regular exception, sigint (Ctrl-C) and sigterm (standard termination signal) @@ -76,8 +78,15 @@ async def bash(self, command: str, sandbox_id: str) -> str: ) # wait for sandbox to be created self.logger.debug(f"Waited {time.time() - s:.1f}s for sandbox to be ready") s = time.time() - self.logger.debug(f"Executing command {command} in sandbox {sandbox_id}") - results = await self.sandbox_client.execute_command(sandbox_id, command) + # Execute command with optional working directory + if self.working_dir: + self.logger.debug(f"Executing command in {self.working_dir}: {command}") + results = await self.sandbox_client.execute_command( + sandbox_id, command, working_dir=self.working_dir + ) + else: + self.logger.debug(f"Executing command {command} in sandbox {sandbox_id}") + results = await self.sandbox_client.execute_command(sandbox_id, command) e = time.time() stdout = results.stdout.strip() stderr = (results.stderr or "").strip() From c52441da047eb92989a05dadbc20c78e74ff9f6e Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Tue, 18 Nov 2025 22:36:25 -0800 Subject: [PATCH 06/22] fix working dir --- verifiers/envs/sandbox_env.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/verifiers/envs/sandbox_env.py b/verifiers/envs/sandbox_env.py index 7e61566ab..1aee1b901 100644 --- a/verifiers/envs/sandbox_env.py +++ b/verifiers/envs/sandbox_env.py @@ -33,7 +33,6 @@ def __init__( environment_vars: dict[str, str] | None = None, team_id: str | None = None, advanced_configs: AdvancedConfigs | None = None, - working_dir: str | None = None, **kwargs, ): super().__init__(**kwargs) @@ -51,7 +50,6 @@ def __init__( team_id=team_id, advanced_configs=advanced_configs, ) - self.working_dir = None # Default working directory for bash commands self.active_sandboxes = set() # Install handlers for regular exception, sigint (Ctrl-C) and sigterm (standard termination signal) @@ -67,9 +65,11 @@ def __init__( signal.SIGTERM, lambda _, __: (self.cleanup_sandboxes(), exit(143)) ) - self.add_tool(self.bash, args_to_skip=["sandbox_id"]) + self.add_tool(self.bash, args_to_skip=["sandbox_id", "working_dir"]) - async def bash(self, command: str, sandbox_id: str) -> str: + async def bash( + self, command: str, sandbox_id: str, working_dir: str | None = None + ) -> str: """Execute `command` inside persistent sandbox container.""" # sandbox_id is passed via update_tool_args, not seen by model s = time.time() @@ -79,10 +79,10 @@ async def bash(self, command: str, sandbox_id: str) -> str: self.logger.debug(f"Waited {time.time() - s:.1f}s for sandbox to be ready") s = time.time() # Execute command with optional working directory - if self.working_dir: - self.logger.debug(f"Executing command in {self.working_dir}: {command}") + if working_dir: + self.logger.debug(f"Executing command in {working_dir}: {command}") results = await self.sandbox_client.execute_command( - sandbox_id, command, working_dir=self.working_dir + sandbox_id, command, working_dir=working_dir ) else: self.logger.debug(f"Executing command {command} in sandbox {sandbox_id}") From a96ba5ceddf22aa068bc2b977848da60b8fef837 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Tue, 18 Nov 2025 22:41:59 -0800 Subject: [PATCH 07/22] fix working dir --- verifiers/envs/harbor_env.py | 39 +++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/verifiers/envs/harbor_env.py b/verifiers/envs/harbor_env.py index 5d7534055..0e64e1b1d 100644 --- a/verifiers/envs/harbor_env.py +++ b/verifiers/envs/harbor_env.py @@ -40,7 +40,6 @@ def __init__( cpu_cores: int = 2, memory_gb: int = 4, disk_size_gb: int = 10, - working_dir: str | None = "/app", **kwargs, ): self.dataset_path = Path(dataset_path) if dataset_path else None @@ -49,9 +48,6 @@ def __init__( self.cpu_cores = cpu_cores self.memory_gb = memory_gb self.disk_size_gb = disk_size_gb - # Set working_dir to /app by default (matches typical Dockerfile WORKDIR) - # This prevents agents from accidentally modifying /oracle or /tests - self.working_dir = working_dir dataset = self._load_harbor_dataset() rubric = vf.Rubric(funcs=[self.harbor_reward], weights=[1.0]) @@ -163,8 +159,8 @@ async def setup_state(self, state: vf.State, **kwargs) -> vf.State: await self._upload_directory(sandbox_id, tests_dir, "/tests") logger.debug(f"Uploaded tests to /tests in sandbox {sandbox_id}") - # Create /logs/verifier directory for test outputs - await self.bash("mkdir -p /logs/verifier", sandbox_id) + # Create /logs/verifier directory for test outputs (run from root) + await self.bash("mkdir -p /logs/verifier", sandbox_id, working_dir=None) # Store task config in state for reward function state["harbor_config"] = config @@ -172,6 +168,25 @@ async def setup_state(self, state: vf.State, **kwargs) -> vf.State: return state + def update_tool_args( + self, + tool_name: str, + tool_args: dict, + messages: vf.Messages, + state: vf.State, + **kwargs, + ) -> dict: + """Inject working_dir=/app for agent bash calls.""" + updated_args = super().update_tool_args( + tool_name, tool_args, messages, state, **kwargs + ) + + # For bash commands, inject working_dir=/app for agent calls + if tool_name == "bash" and "working_dir" not in updated_args: + updated_args["working_dir"] = "/app" + + return updated_args + async def _upload_directory( self, sandbox_id: str, local_dir: Path, remote_path: str ): @@ -188,9 +203,11 @@ async def _upload_directory( remote_tar = f"/tmp/upload_{local_dir.name}.tar.gz" await self.sandbox_client.upload_file(sandbox_id, remote_tar, str(tar_path)) + # Extract in sandbox (run from root to access /tmp and create directories) await self.bash( f"mkdir -p {remote_path} && tar -xzf {remote_tar} -C {remote_path} && rm {remote_tar}", sandbox_id, + working_dir=None, ) finally: tar_path.unlink(missing_ok=True) @@ -220,12 +237,14 @@ async def _compute_reward(self, state: vf.State) -> float: try: logger.info(f"Running tests for task {state.get('task')}") - result = await self.bash("cd /tests && bash test.sh", sandbox_id) + result = await self.bash("bash test.sh", sandbox_id, working_dir="/tests") logger.debug(f"Test script output: {result}") # Try to read reward.txt first reward_txt_result = await self.bash( - "cat /logs/verifier/reward.txt 2>/dev/null || echo ''", sandbox_id + "cat /logs/verifier/reward.txt 2>/dev/null || echo ''", + sandbox_id, + working_dir=None, ) if reward_txt_result and reward_txt_result.strip(): @@ -235,7 +254,9 @@ async def _compute_reward(self, state: vf.State) -> float: # Fall back to reward.json reward_json_result = await self.bash( - "cat /logs/verifier/reward.json 2>/dev/null || echo ''", sandbox_id + "cat /logs/verifier/reward.json 2>/dev/null || echo ''", + sandbox_id, + working_dir=None, ) if reward_json_result and reward_json_result.strip(): From b173a7caa55f801e7471003c2f2e3f5d96ed2857 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Wed, 19 Nov 2025 14:22:27 -0800 Subject: [PATCH 08/22] revert prime sandbox version change --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 777243cc8..e389404de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "pydantic>=2.11.9", "tomli; python_version < '3.11'", "typing_extensions; python_version < '3.12'", - "prime-sandboxes>=0.2.4", + "prime-sandboxes>=0.1.0", "wget>=3.2", ] From 6e29a8fbf97fb90c55155d4ed360b5a904b1bc14 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Wed, 19 Nov 2025 14:23:42 -0800 Subject: [PATCH 09/22] revert python env change --- verifiers/envs/python_env.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/verifiers/envs/python_env.py b/verifiers/envs/python_env.py index a6d2ca5f7..d1b6148de 100644 --- a/verifiers/envs/python_env.py +++ b/verifiers/envs/python_env.py @@ -112,8 +112,6 @@ def ensure_fifo(path: str) -> None: rm -f "$command_fifo" "$response_fifo" "$ready_flag" - pip install -q numpy sympy scipy - python - <<'PY' import base64 from pathlib import Path From 8a3739d37b7192d2822051f4136e5b5b5b461d2b Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 12:51:11 -0800 Subject: [PATCH 10/22] harbor cli agent --- verifiers/envs/harbor_cli_agent_env.py | 316 +++++++++++++++++++++++++ 1 file changed, 316 insertions(+) create mode 100644 verifiers/envs/harbor_cli_agent_env.py diff --git a/verifiers/envs/harbor_cli_agent_env.py b/verifiers/envs/harbor_cli_agent_env.py new file mode 100644 index 000000000..e78ffd0c5 --- /dev/null +++ b/verifiers/envs/harbor_cli_agent_env.py @@ -0,0 +1,316 @@ +import asyncio +import json +import logging +import tarfile +import tempfile +import uuid +from pathlib import Path +from typing import Any + +import tomli +from datasets import Dataset +from prime_sandboxes import ( + AsyncSandboxClient, + CreateSandboxRequest, +) + +import verifiers as vf +from verifiers.envs.cli_agent_env import CliAgentEnv + +logger = logging.getLogger(__name__) + + +class HarborCliAgentEnv(CliAgentEnv): + """CliAgentEnv subclass that loads Harbor-format tasks.""" + + def __init__( + self, + dataset_path: str | Path, + tasks: list[str] | None = None, + agent_workdir: str = "/app", + default_docker_image: str | None = None, + **kwargs, + ): + self.dataset_path = Path(dataset_path) + self.task_names = tasks + self.agent_workdir = agent_workdir + self.default_docker_image = default_docker_image + + if default_docker_image and "docker_image" not in kwargs: + kwargs["docker_image"] = default_docker_image + + dataset = self._load_harbor_dataset() + rubric = vf.Rubric(funcs=[self.harbor_reward], weights=[1.0]) + + super().__init__(dataset=dataset, rubric=rubric, **kwargs) + + def _load_harbor_dataset(self) -> Dataset: + """Load Harbor tasks from dataset directory into a Dataset with prompts.""" + if not self.dataset_path.exists(): + raise FileNotFoundError(f"Dataset path not found: {self.dataset_path}") + + tasks = [] + for task_dir in sorted(self.dataset_path.iterdir()): + if not task_dir.is_dir(): + continue + + if self.task_names and task_dir.name not in self.task_names: + continue + + task_toml = task_dir / "task.toml" + instruction_md = task_dir / "instruction.md" + + if not task_toml.exists() or not instruction_md.exists(): + logger.warning( + f"Skipping {task_dir.name}: missing task.toml or instruction.md" + ) + continue + + with open(task_toml, "rb") as f: + config = tomli.load(f) + + instruction = instruction_md.read_text().strip() + + docker_image = ( + config.get("environment", {}).get("docker_image") + or self.default_docker_image + ) + if not docker_image: + logger.warning( + f"Skipping {task_dir.name}: no environment.docker_image in task.toml " + "and no default_docker_image provided. " + "Run harbor_build.py first to build/push images." + ) + continue + + # TODO: remove this prompt + messages = [ + { + "role": "system", + "content": ( + "You are an autonomous agent inside a Harbor sandbox. " + f"Work in {self.agent_workdir}, follow the user instruction, write /tmp/vf_complete " + "when finished, and do not tamper with /tests or /oracle." + ), + }, + {"role": "user", "content": instruction}, + ] + + task_entry = { + "example_id": len(tasks), + "task": task_dir.name, + "prompt": messages, + "info": { + "task_dir": str(task_dir), + "docker_image": docker_image, + "config": config, + }, + } + + tasks.append(task_entry) + + if not tasks: + raise ValueError(f"No valid Harbor tasks found in {self.dataset_path}") + + logger.info(f"Loaded {len(tasks)} Harbor tasks from {self.dataset_path}") + return Dataset.from_list(tasks) + + async def setup_state(self, state: vf.State) -> vf.State: + """Create sandbox per rollout with Harbor assets uploaded.""" + # Skip CliAgentEnv.setup_state (needs per-task docker image); call MultiTurnEnv.setup_state + state = await super(CliAgentEnv, self).setup_state(state) # type: ignore[misc] + + task_info: dict[str, Any] = state.get("info", {}) or {} + task_dir = Path(task_info.get("task_dir", "")) + config = task_info.get("config", {}) + docker_image = task_info.get("docker_image") or self.docker_image + + if not task_dir.exists(): + raise FileNotFoundError(f"Task directory not found: {task_dir}") + + rollout_id = f"rollout_{uuid.uuid4().hex[:8]}" + state["rollout_id"] = rollout_id + + await self._ensure_interception_server() + + tunnel_url: str | None = None + if self.interception_host is None: + tunnel_url = await self._get_tunnel_url() + state["interception_base_url"] = f"{tunnel_url}/rollout/{rollout_id}/v1" + else: + state["interception_base_url"] = ( + f"http://{self.interception_host}:{self.interception_port}/rollout/{rollout_id}/v1" + ) + + env_vars = dict(self.environment_vars) if self.environment_vars else {} + env_vars["OPENAI_BASE_URL"] = state["interception_base_url"] + model = state.get("model") + if model: + env_vars["OPENAI_MODEL"] = model + env_vars.setdefault("HARBOR_TASK_NAME", state.get("task", "")) + env_vars.setdefault("HARBOR_TASK_DIR", "/task") + env_vars.setdefault("HARBOR_INSTRUCTION_PATH", "/task/instruction.md") + if self.agent_workdir: + env_vars.setdefault("AGENT_WORKDIR", self.agent_workdir) + + sandbox_client = AsyncSandboxClient() + sandbox_request = CreateSandboxRequest( + name=f"harbor-cli-agent-{rollout_id}", + docker_image=docker_image, + start_command=self.start_command, + cpu_cores=self.cpu_cores, + memory_gb=self.memory_gb, + disk_size_gb=self.disk_size_gb, + gpu_count=self.gpu_count, + timeout_minutes=self.timeout_minutes, + environment_vars=env_vars, + team_id=self.team_id, + advanced_configs=self.advanced_configs, + ) + + logger.debug( + f"Creating Harbor sandbox with OPENAI_BASE_URL={env_vars.get('OPENAI_BASE_URL')} " + f"docker_image={docker_image}" + ) + sandbox = await sandbox_client.create(sandbox_request) + state["sandbox_id"] = sandbox.id + await sandbox_client.wait_for_creation(sandbox.id) + + await self._prepare_harbor_task(sandbox_client, sandbox.id, task_dir) + state["harbor_config"] = config + state["harbor_task_dir"] = str(task_dir) + + request_id_queue: asyncio.Queue[str] = asyncio.Queue() + state["request_id_queue"] = request_id_queue + state["current_request_id"] = None + state["tunnel_url"] = tunnel_url if self.interception_host is None else None + self.active_rollouts[rollout_id] = { + "request_id_queue": request_id_queue, + "current_request_id": None, + } + + return state + + async def _prepare_harbor_task( + self, sandbox_client: AsyncSandboxClient, sandbox_id: str, task_dir: Path + ) -> None: + """Upload solution/tests and make log directory.""" + solution_dir = task_dir / "solution" + tests_dir = task_dir / "tests" + instruction_path = task_dir / "instruction.md" + task_toml_path = task_dir / "task.toml" + + if solution_dir.exists(): + await self._upload_directory( + sandbox_client, sandbox_id, solution_dir, "/oracle" + ) + logger.debug(f"Uploaded solution for {task_dir.name} to /oracle") + + if tests_dir.exists(): + await self._upload_directory( + sandbox_client, sandbox_id, tests_dir, "/tests" + ) + logger.debug(f"Uploaded tests for {task_dir.name} to /tests") + + mkdir_cmd = f"mkdir -p /logs/verifier /task {self.agent_workdir}" + await sandbox_client.execute_command(sandbox_id, mkdir_cmd, working_dir=None) + + if instruction_path.exists(): + await sandbox_client.upload_file( + sandbox_id, "/task/instruction.md", str(instruction_path) + ) + if task_toml_path.exists(): + await sandbox_client.upload_file( + sandbox_id, "/task/task.toml", str(task_toml_path) + ) + + async def _upload_directory( + self, + sandbox_client: AsyncSandboxClient, + sandbox_id: str, + local_dir: Path, + remote_path: str, + ) -> None: + """Tar + upload a directory into the sandbox.""" + with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_file: + tar_path = Path(tmp_file.name) + + try: + with tarfile.open(tar_path, "w:gz") as tar: + for item in local_dir.iterdir(): + tar.add(item, arcname=item.name) + + remote_tar = f"/tmp/upload_{local_dir.name}.tar.gz" + await sandbox_client.upload_file(sandbox_id, remote_tar, str(tar_path)) + await sandbox_client.execute_command( + sandbox_id, + f"mkdir -p {remote_path} && tar -xzf {remote_tar} -C {remote_path} && rm {remote_tar}", + working_dir=None, + ) + finally: + tar_path.unlink(missing_ok=True) + + async def post_rollout(self, state: vf.State): + """Run Harbor tests to compute reward before sandbox destruction.""" + reward = await self._compute_reward(state) + state["harbor_reward_value"] = reward + state["reward"] = reward + + async def harbor_reward(self, state: vf.State, **kwargs) -> float: + return state.get("harbor_reward_value", 0.0) + + async def _compute_reward(self, state: vf.State) -> float: + """ + Execute Harbor tests (tests/test.sh) inside the sandbox to compute reward. + Prioritizes /logs/verifier/reward.txt, falling back to reward.json. + """ + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + logger.error("No sandbox_id in state") + return 0.0 + + sandbox_client = AsyncSandboxClient() + try: + logger.info(f"Running Harbor tests for task {state.get('task')}") + await sandbox_client.execute_command( + sandbox_id, "bash test.sh", working_dir="/tests" + ) + + reward_txt = await sandbox_client.execute_command( + sandbox_id, + "cat /logs/verifier/reward.txt 2>/dev/null || echo ''", + working_dir=None, + ) + reward_txt_val = self._stdout_text(reward_txt) + if reward_txt_val: + value = float(reward_txt_val) + logger.info(f"Reward from reward.txt: {value}") + return value + + reward_json = await sandbox_client.execute_command( + sandbox_id, + "cat /logs/verifier/reward.json 2>/dev/null || echo ''", + working_dir=None, + ) + reward_json_val = self._stdout_text(reward_json) + if reward_json_val: + data = json.loads(reward_json_val) + value = float(data.get("reward", 0.0)) + logger.info(f"Reward from reward.json: {value}") + return value + + logger.warning("No reward.txt or reward.json produced by Harbor tests") + return 0.0 + except Exception as e: + logger.error(f"Error computing Harbor reward: {e}") + return 0.0 + + @staticmethod + def _stdout_text(result: Any) -> str: + """Extract trimmed stdout from a Sandbox command result.""" + stdout_val = getattr(result, "stdout", "") + if stdout_val is None: + return "" + if isinstance(stdout_val, str): + return stdout_val.strip() + return str(stdout_val).strip() From e7445473c34fdb136291468e77f3ec6190259d96 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 14:05:41 -0800 Subject: [PATCH 11/22] log requests --- verifiers/envs/cli_agent_env.py | 67 +++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index 42153c20d..d805c9105 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -25,6 +25,37 @@ logger = logging.getLogger(__name__) +def _truncate(text: str, max_len: int = 500) -> str: + """Truncate text for logging.""" + if len(text) <= max_len: + return text + return text[:max_len] + f"... ({len(text) - max_len} more chars)" + + +def _format_message(msg: dict) -> str: + """Format a message for logging.""" + role = msg.get("role", "unknown") + content = msg.get("content", "") + + # Handle tool calls + tool_calls = msg.get("tool_calls", []) + if tool_calls: + tools_summary = [] + for tc in tool_calls: + func = tc.get("function", {}) + name = func.get("name", "unknown") + args = func.get("arguments", "") + tools_summary.append(f"{name}({_truncate(args, 200)})") + return f"[{role}] tool_calls: {', '.join(tools_summary)}" + + # Handle tool results + if role == "tool": + tool_call_id = msg.get("tool_call_id", "") + return f"[{role}:{tool_call_id}] {_truncate(str(content), 300)}" + + return f"[{role}] {_truncate(str(content), 500)}" + + class CliAgentEnv(vf.MultiTurnEnv): """ Environment for running full agent code inside sandboxes. @@ -55,6 +86,7 @@ def __init__( environment_vars: dict[str, str] | None = None, team_id: str | None = None, advanced_configs: AdvancedConfigs | None = None, + log_requests: bool = True, **kwargs, ): super().__init__(max_turns=max_turns, message_type="chat", **kwargs) @@ -77,12 +109,14 @@ def __init__( self.environment_vars = environment_vars self.team_id = team_id self.advanced_configs = advanced_configs + self.log_requests = log_requests self.active_rollouts: dict[str, dict[str, Any]] = {} self.intercepts: dict[str, dict[str, Any]] = {} # request_id -> intercept data self.interception_server: Any = None self._server_lock = asyncio.Lock() self._server_runner: Any = None self._server_site: Any = None + self._request_counts: dict[str, int] = {} # rollout_id -> request count def _ensure_cloudflared_installed(self) -> str: """Install cloudflared if not already installed. Returns path to cloudflared binary.""" @@ -318,8 +352,15 @@ async def get_prompt_messages(self, state: State) -> Messages: process request immediately with injected sampling_args, store response in intercept, return messages. """ + rollout_id = state.get("rollout_id", "unknown") request_id_queue = state["request_id_queue"] + # Track request count for logging + if rollout_id not in self._request_counts: + self._request_counts[rollout_id] = 0 + self._request_counts[rollout_id] += 1 + req_num = self._request_counts[rollout_id] + request_id = await asyncio.wait_for( request_id_queue.get(), timeout=self.request_timeout, @@ -333,6 +374,16 @@ async def get_prompt_messages(self, state: State) -> Messages: request_tools = intercept.get("tools") effective_sampling_args = state.get("sampling_args") or {} + # Log the intercepted request + if self.log_requests: + logger.info( + f"[Request #{req_num}] model={request_model}, " + f"messages={len(messages)}, tools={len(request_tools or [])}" + ) + # Log the last few messages (most relevant context) + for msg in messages[-3:]: + logger.info(f" {_format_message(msg)}") + client = state.get("client") if client is None: raise RuntimeError("Client not set in state") @@ -347,6 +398,18 @@ async def get_prompt_messages(self, state: State) -> Messages: message_type=None, ) + # Log the response + if self.log_requests and response.choices: + choice = response.choices[0] + msg = choice.message + if msg.tool_calls: + tools = [f"{tc.function.name}(...)" for tc in msg.tool_calls] + logger.info(f"[Response #{req_num}] tool_calls: {', '.join(tools)}") + elif msg.content: + logger.info(f"[Response #{req_num}] {_truncate(msg.content, 200)}") + else: + logger.info(f"[Response #{req_num}] (empty)") + intercept["response_future"].set_result(response) intercept["response"] = response state["current_request_id"] = request_id @@ -482,6 +545,10 @@ async def cleanup_interception_context(self, state: State): del self.intercepts[request_id] del self.active_rollouts[rollout_id] + # Clean up request count + if rollout_id and rollout_id in self._request_counts: + del self._request_counts[rollout_id] + # Decrement active rollouts for the tunnel used by this rollout tunnel_url = state.get("tunnel_url") if tunnel_url: From c21f3d1ba04627ef2a9aff3191d146f91b70717a Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 14:06:49 -0800 Subject: [PATCH 12/22] fix comment --- verifiers/envs/harbor_cli_agent_env.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verifiers/envs/harbor_cli_agent_env.py b/verifiers/envs/harbor_cli_agent_env.py index e78ffd0c5..612dc0203 100644 --- a/verifiers/envs/harbor_cli_agent_env.py +++ b/verifiers/envs/harbor_cli_agent_env.py @@ -169,7 +169,7 @@ async def setup_state(self, state: vf.State) -> vf.State: ) logger.debug( - f"Creating Harbor sandbox with OPENAI_BASE_URL={env_vars.get('OPENAI_BASE_URL')} " + f"Creating sandbox with OPENAI_BASE_URL={env_vars.get('OPENAI_BASE_URL')} " f"docker_image={docker_image}" ) sandbox = await sandbox_client.create(sandbox_request) From 9ed4b5d1f9ed5f3004ef48bce325ed802873d2be Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 15:40:38 -0800 Subject: [PATCH 13/22] better logs --- verifiers/envs/cli_agent_env.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index d805c9105..8ad3d22b4 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -3,6 +3,7 @@ import subprocess import time import uuid +import json from typing import Any from aiohttp import web @@ -514,6 +515,9 @@ async def _handle_intercepted_request(self, request: Any) -> Any: response_dict = ( response.model_dump() if hasattr(response, "model_dump") else dict(response) ) + logger.info( + f"Response to agent: {json.dumps(response_dict, indent=2, default=str)[:2000]}" + ) return web.json_response(response_dict) # type: ignore @vf.teardown From 464998becca17bdea8d8a28424d5dc9e2c3e8f88 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 15:55:32 -0800 Subject: [PATCH 14/22] remove reasoning? --- verifiers/envs/cli_agent_env.py | 91 ++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index 8ad3d22b4..1b07f81c0 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -33,6 +33,91 @@ def _truncate(text: str, max_len: int = 500) -> str: return text[:max_len] + f"... ({len(text) - max_len} more chars)" +def _normalize_chat_completion_response(response_dict: dict) -> dict: + """ + Normalize a chat completion response to standard OpenAI format. + + Removes non-standard fields that may confuse agents like Codex/Cline + that expect strict OpenAI chat completions API format. + """ + # Standard top-level fields for chat.completion + standard_top_level = { + "id", + "object", + "created", + "model", + "choices", + "usage", + "system_fingerprint", + } + + # Standard message fields + standard_message_fields = { + "role", + "content", + "tool_calls", + "function_call", + "refusal", + "name", + } + + # Standard tool_call fields + standard_tool_call_fields = {"id", "type", "function"} + + # Standard function fields within tool_call + standard_function_fields = {"name", "arguments"} + + # Filter top-level fields + normalized = {k: v for k, v in response_dict.items() if k in standard_top_level} + + # Normalize choices + if "choices" in response_dict: + normalized_choices = [] + for choice in response_dict["choices"]: + normalized_choice = { + "index": choice.get("index", 0), + "finish_reason": choice.get("finish_reason"), + "logprobs": choice.get("logprobs"), + } + + # Normalize message + if "message" in choice: + msg = choice["message"] + normalized_msg = { + k: v for k, v in msg.items() if k in standard_message_fields + } + + # Normalize tool_calls if present + if "tool_calls" in msg and msg["tool_calls"]: + normalized_tool_calls = [] + for tc in msg["tool_calls"]: + normalized_tc = { + k: v + for k, v in tc.items() + if k in standard_tool_call_fields + } + # Normalize function within tool_call + if "function" in tc: + normalized_tc["function"] = { + k: v + for k, v in tc["function"].items() + if k in standard_function_fields + } + normalized_tool_calls.append(normalized_tc) + normalized_msg["tool_calls"] = normalized_tool_calls + + normalized_choice["message"] = normalized_msg + + normalized_choices.append(normalized_choice) + normalized["choices"] = normalized_choices + + # Ensure object type is set + if "object" not in normalized: + normalized["object"] = "chat.completion" + + return normalized + + def _format_message(msg: dict) -> str: """Format a message for logging.""" role = msg.get("role", "unknown") @@ -515,10 +600,12 @@ async def _handle_intercepted_request(self, request: Any) -> Any: response_dict = ( response.model_dump() if hasattr(response, "model_dump") else dict(response) ) + # Normalize to standard OpenAI format (removes non-standard fields like reasoning, reasoning_details) + normalized_response = _normalize_chat_completion_response(response_dict) logger.info( - f"Response to agent: {json.dumps(response_dict, indent=2, default=str)[:2000]}" + f"Response to agent: {json.dumps(normalized_response, indent=2, default=str)[:2000]}" ) - return web.json_response(response_dict) # type: ignore + return web.json_response(normalized_response) # type: ignore @vf.teardown async def teardown_tunnel(self): From c0b1f28f894b3fccbd950405434946fe28955549 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 16:00:05 -0800 Subject: [PATCH 15/22] usage --- verifiers/envs/cli_agent_env.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index 1b07f81c0..83cda2b5e 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -115,6 +115,14 @@ def _normalize_chat_completion_response(response_dict: dict) -> dict: if "object" not in normalized: normalized["object"] = "chat.completion" + # Ensure usage is present (some agents like Codex may fail on null usage) + if normalized.get("usage") is None: + normalized["usage"] = { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + } + return normalized From 94417fc694f9cf2c09db5ecb5591ced3b09b3f92 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 16:03:16 -0800 Subject: [PATCH 16/22] streaming? --- verifiers/envs/cli_agent_env.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index 83cda2b5e..f05fba97d 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -584,6 +584,14 @@ async def _handle_intercepted_request(self, request: Any) -> Any: {"error": f"Invalid JSON: {e}"}, status=400 ) + # Log request details including stream parameter + stream_requested = request_body.get("stream", False) + logger.info( + f"Intercepted request: stream={stream_requested}, " + f"model={request_body.get('model')}, " + f"messages={len(request_body.get('messages', []))}" + ) + request_id = f"req_{uuid.uuid4().hex[:8]}" intercept = { "request_id": request_id, From 0a0efd4f95720167e91295c187360ce0c34cdd73 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 16:18:13 -0800 Subject: [PATCH 17/22] create sse response --- verifiers/envs/cli_agent_env.py | 123 ++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index f05fba97d..fc8868de6 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -592,6 +592,10 @@ async def _handle_intercepted_request(self, request: Any) -> Any: f"messages={len(request_body.get('messages', []))}" ) + # Force non-streaming - we don't support SSE streaming yet + # The response will be converted to SSE format if stream was requested + request_body["stream"] = False + request_id = f"req_{uuid.uuid4().hex[:8]}" intercept = { "request_id": request_id, @@ -599,6 +603,7 @@ async def _handle_intercepted_request(self, request: Any) -> Any: "messages": request_body["messages"], "model": request_body.get("model"), "tools": request_body.get("tools"), + "stream_requested": stream_requested, # Remember if client wanted streaming "response_future": asyncio.Future(), } @@ -621,8 +626,126 @@ async def _handle_intercepted_request(self, request: Any) -> Any: logger.info( f"Response to agent: {json.dumps(normalized_response, indent=2, default=str)[:2000]}" ) + + # If client requested streaming, convert to SSE format + if intercept.get("stream_requested", False): + return self._create_sse_response(normalized_response) + return web.json_response(normalized_response) # type: ignore + def _create_sse_response(self, response_dict: dict) -> web.Response: + """Convert a chat completion response to SSE streaming format.""" + response_id = response_dict.get("id", "chatcmpl-unknown") + created = response_dict.get("created", 0) + model = response_dict.get("model", "unknown") + + chunks = [] + + for choice in response_dict.get("choices", []): + message = choice.get("message", {}) + finish_reason = choice.get("finish_reason") + index = choice.get("index", 0) + + # First chunk: role + first_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": index, + "delta": {"role": message.get("role", "assistant")}, + "finish_reason": None, + } + ], + } + chunks.append(f"data: {json.dumps(first_chunk)}\n\n") + + # Content chunk (if any) + content = message.get("content") + if content: + content_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": index, + "delta": {"content": content}, + "finish_reason": None, + } + ], + } + chunks.append(f"data: {json.dumps(content_chunk)}\n\n") + + # Tool calls chunks (if any) + tool_calls = message.get("tool_calls", []) + if tool_calls: + # Send tool call with full info in one chunk + tc_delta = [] + for i, tc in enumerate(tool_calls): + tc_delta.append( + { + "index": i, + "id": tc.get("id"), + "type": tc.get("type", "function"), + "function": { + "name": tc.get("function", {}).get("name", ""), + "arguments": tc.get("function", {}).get( + "arguments", "" + ), + }, + } + ) + + tool_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": index, + "delta": {"tool_calls": tc_delta}, + "finish_reason": None, + } + ], + } + chunks.append(f"data: {json.dumps(tool_chunk)}\n\n") + + # Final chunk with finish_reason + final_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [ + { + "index": index, + "delta": {}, + "finish_reason": finish_reason, + } + ], + } + chunks.append(f"data: {json.dumps(final_chunk)}\n\n") + + # End of stream + chunks.append("data: [DONE]\n\n") + + body = "".join(chunks) + logger.debug(f"SSE response body:\n{body[:1500]}") + return web.Response( + body=body, + status=200, + content_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + @vf.teardown async def teardown_tunnel(self): """Stop all cloudflared tunnel processes""" From 634a47ebd5b0d415deb1bcf6c58c834312bec409 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 16:30:07 -0800 Subject: [PATCH 18/22] remove normalization --- verifiers/envs/cli_agent_env.py | 102 ++------------------------------ 1 file changed, 4 insertions(+), 98 deletions(-) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index fc8868de6..24762d833 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -33,99 +33,6 @@ def _truncate(text: str, max_len: int = 500) -> str: return text[:max_len] + f"... ({len(text) - max_len} more chars)" -def _normalize_chat_completion_response(response_dict: dict) -> dict: - """ - Normalize a chat completion response to standard OpenAI format. - - Removes non-standard fields that may confuse agents like Codex/Cline - that expect strict OpenAI chat completions API format. - """ - # Standard top-level fields for chat.completion - standard_top_level = { - "id", - "object", - "created", - "model", - "choices", - "usage", - "system_fingerprint", - } - - # Standard message fields - standard_message_fields = { - "role", - "content", - "tool_calls", - "function_call", - "refusal", - "name", - } - - # Standard tool_call fields - standard_tool_call_fields = {"id", "type", "function"} - - # Standard function fields within tool_call - standard_function_fields = {"name", "arguments"} - - # Filter top-level fields - normalized = {k: v for k, v in response_dict.items() if k in standard_top_level} - - # Normalize choices - if "choices" in response_dict: - normalized_choices = [] - for choice in response_dict["choices"]: - normalized_choice = { - "index": choice.get("index", 0), - "finish_reason": choice.get("finish_reason"), - "logprobs": choice.get("logprobs"), - } - - # Normalize message - if "message" in choice: - msg = choice["message"] - normalized_msg = { - k: v for k, v in msg.items() if k in standard_message_fields - } - - # Normalize tool_calls if present - if "tool_calls" in msg and msg["tool_calls"]: - normalized_tool_calls = [] - for tc in msg["tool_calls"]: - normalized_tc = { - k: v - for k, v in tc.items() - if k in standard_tool_call_fields - } - # Normalize function within tool_call - if "function" in tc: - normalized_tc["function"] = { - k: v - for k, v in tc["function"].items() - if k in standard_function_fields - } - normalized_tool_calls.append(normalized_tc) - normalized_msg["tool_calls"] = normalized_tool_calls - - normalized_choice["message"] = normalized_msg - - normalized_choices.append(normalized_choice) - normalized["choices"] = normalized_choices - - # Ensure object type is set - if "object" not in normalized: - normalized["object"] = "chat.completion" - - # Ensure usage is present (some agents like Codex may fail on null usage) - if normalized.get("usage") is None: - normalized["usage"] = { - "prompt_tokens": 0, - "completion_tokens": 0, - "total_tokens": 0, - } - - return normalized - - def _format_message(msg: dict) -> str: """Format a message for logging.""" role = msg.get("role", "unknown") @@ -621,17 +528,16 @@ async def _handle_intercepted_request(self, request: Any) -> Any: response_dict = ( response.model_dump() if hasattr(response, "model_dump") else dict(response) ) - # Normalize to standard OpenAI format (removes non-standard fields like reasoning, reasoning_details) - normalized_response = _normalize_chat_completion_response(response_dict) + logger.info( - f"Response to agent: {json.dumps(normalized_response, indent=2, default=str)[:2000]}" + f"Response to agent: {json.dumps(response_dict, indent=2, default=str)[:2000]}" ) # If client requested streaming, convert to SSE format if intercept.get("stream_requested", False): - return self._create_sse_response(normalized_response) + return self._create_sse_response(response_dict) - return web.json_response(normalized_response) # type: ignore + return web.json_response(response_dict) # type: ignore def _create_sse_response(self, response_dict: dict) -> web.Response: """Convert a chat completion response to SSE streaming format.""" From 7beaaca824c38e089d6afe9e6a593c92a5859376 Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 16:30:46 -0800 Subject: [PATCH 19/22] log requests false --- verifiers/envs/cli_agent_env.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index 24762d833..4d74b3e19 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -87,7 +87,7 @@ def __init__( environment_vars: dict[str, str] | None = None, team_id: str | None = None, advanced_configs: AdvancedConfigs | None = None, - log_requests: bool = True, + log_requests: bool = False, **kwargs, ): super().__init__(max_turns=max_turns, message_type="chat", **kwargs) From 42935372aa5f38c29e83ab538f65fc5cfa25332d Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Mon, 24 Nov 2025 16:33:56 -0800 Subject: [PATCH 20/22] rm json log --- verifiers/envs/cli_agent_env.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index 4d74b3e19..9fd61f7ff 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -87,7 +87,7 @@ def __init__( environment_vars: dict[str, str] | None = None, team_id: str | None = None, advanced_configs: AdvancedConfigs | None = None, - log_requests: bool = False, + log_requests: bool = True, **kwargs, ): super().__init__(max_turns=max_turns, message_type="chat", **kwargs) @@ -529,9 +529,9 @@ async def _handle_intercepted_request(self, request: Any) -> Any: response.model_dump() if hasattr(response, "model_dump") else dict(response) ) - logger.info( - f"Response to agent: {json.dumps(response_dict, indent=2, default=str)[:2000]}" - ) + # logger.info( + # f"Response to agent: {json.dumps(response_dict, indent=2, default=str)[:2000]}" + # ) # If client requested streaming, convert to SSE format if intercept.get("stream_requested", False): From 9bec10d45a09fa0ea0082ab3ef0e4c5636bf622c Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Wed, 3 Dec 2025 17:40:13 -0800 Subject: [PATCH 21/22] polling --- verifiers/envs/cli_agent_env.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index 9fd61f7ff..ca328cc20 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -362,10 +362,33 @@ async def get_prompt_messages(self, state: State) -> Messages: self._request_counts[rollout_id] += 1 req_num = self._request_counts[rollout_id] - request_id = await asyncio.wait_for( - request_id_queue.get(), - timeout=self.request_timeout, - ) + # Poll for requests while checking completion periodically + # This avoids blocking for the full request_timeout when agent finishes + poll_interval = 5.0 # Check completion every 5 seconds + elapsed = 0.0 + request_id = None + while elapsed < self.request_timeout: + try: + request_id = await asyncio.wait_for( + request_id_queue.get(), + timeout=poll_interval, + ) + break # Got a request, continue processing + except asyncio.TimeoutError: + elapsed += poll_interval + # Check if agent signaled completion + if await self.agent_signaled_completion(state): + logger.debug("Agent signaled completion while waiting for request") + raise StopAsyncIteration("Agent completed") + # Check timeout + if await self.timeout_reached(state): + logger.debug("Timeout reached while waiting for request") + raise StopAsyncIteration("Timeout reached") + + if request_id is None: + raise asyncio.TimeoutError( + f"No request received within {self.request_timeout}s" + ) intercept = self.intercepts[request_id] messages = intercept["messages"] From 1926991e8cb2353e5388f4d2f26e6056d745caeb Mon Sep 17 00:00:00 2001 From: Cooper Miller Date: Wed, 3 Dec 2025 17:48:37 -0800 Subject: [PATCH 22/22] fix exit handling --- verifiers/envs/cli_agent_env.py | 35 +++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/verifiers/envs/cli_agent_env.py b/verifiers/envs/cli_agent_env.py index ca328cc20..31eaae70f 100644 --- a/verifiers/envs/cli_agent_env.py +++ b/verifiers/envs/cli_agent_env.py @@ -379,11 +379,14 @@ async def get_prompt_messages(self, state: State) -> Messages: # Check if agent signaled completion if await self.agent_signaled_completion(state): logger.debug("Agent signaled completion while waiting for request") - raise StopAsyncIteration("Agent completed") + # Set flag for early exit - stop conditions will handle termination + state["_cli_agent_completed"] = True + return [] # Return empty messages, stop condition will trigger # Check timeout if await self.timeout_reached(state): logger.debug("Timeout reached while waiting for request") - raise StopAsyncIteration("Timeout reached") + state["_cli_agent_completed"] = True + return [] if request_id is None: raise asyncio.TimeoutError( @@ -454,6 +457,29 @@ async def get_model_response( Return cached response if available (set by get_prompt_messages). Otherwise fall back to parent implementation. """ + # If prompt is empty, we're in early-exit mode - return a dummy response + # The stop condition will terminate the rollout on the next iteration + if not prompt: + from openai.types.chat import ChatCompletion, ChatCompletionMessage + from openai.types.chat.chat_completion import Choice + + return ChatCompletion( + id="cli_agent_early_exit", + choices=[ + Choice( + finish_reason="stop", + index=0, + message=ChatCompletionMessage( + role="assistant", + content="", + ), + ) + ], + created=int(time.time()), + model=model, + object="chat.completion", + ) + for request_id, intercept in list(self.intercepts.items()): rollout_id = intercept.get("rollout_id") if rollout_id and rollout_id in self.active_rollouts: @@ -719,6 +745,11 @@ async def cleanup_interception_context(self, state: State): ) break + @vf.stop + async def early_exit_flag_set(self, state: State) -> bool: + """Check if early exit flag was set (by completion detection in get_prompt_messages)""" + return state.get("_cli_agent_completed", False) + @vf.stop async def agent_signaled_completion(self, state: State) -> bool: """Check for /tmp/vf_complete marker file"""