Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions benchmarks/swe_bench/run_nano_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def run_evaluation(endpoint: str, model_name: str, subset: str, split: str, slic
"""Run nano_agent on SWE-bench tasks and save predictions using a process pool."""

# Load SWE-bench dataset
dataset = load_dataset(f"princeton-nlp/SWE-bench_{subset}", split=split)
dataset_name = f"princeton-nlp/SWE-bench_{subset}"
dataset = load_dataset(dataset_name, split=split)

# Parse slice
# Supported forms:
Expand Down Expand Up @@ -79,7 +80,7 @@ def run_evaluation(endpoint: str, model_name: str, subset: str, split: str, slic
print(f"Starting processing {len(inputs)} instances with {max_workers} workers...")
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_instance_id = {
executor.submit(_process_one, datum, config): datum["instance_id"] for datum in inputs
executor.submit(_process_one, datum, config, dataset_name): datum["instance_id"] for datum in inputs
}

completed = 0
Expand Down
72 changes: 72 additions & 0 deletions scripts/pull_swegym_images.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
Script to pull all SWE-Gym images using apptainer pull to populate the cache.
Usage: python3 scripts/pull_swegym_images.py --dataset SWE-Gym/SWE-Gym --shard-id 0 --num-shards 10
"""

import argparse
import subprocess
import sys
import os
from datasets import load_dataset

def main():
parser = argparse.ArgumentParser(description="Pull SWE-Gym images for Apptainer cache")
parser.add_argument("--dataset", default="SWE-Gym/SWE-Gym",
help="SWE-Gym dataset name (e.g., SWE-Gym/SWE-Gym or SWE-Gym/SWE-Gym-Lite)")
parser.add_argument("--split", default="train", help="Dataset split")
parser.add_argument("--temp-sif", default="/proj/berzelius-2024-336/users/x_andaf/CodeRepairRL/temp.sif",
help="Temporary SIF file path")
parser.add_argument("--shard-id", type=int, default=0, help="Shard ID for distributed pulling")
parser.add_argument("--num-shards", type=int, default=1, help="Total number of shards")
args = parser.parse_args()

print(f"Loading dataset {args.dataset} split {args.split}...")
try:
dataset = load_dataset(args.dataset, split=args.split)
except Exception as e:
print(f"Error loading dataset: {e}")
sys.exit(1)

# Sharding logic
if args.num_shards > 1:
dataset = dataset.shard(num_shards=args.num_shards, index=args.shard_id)
print(f"Processing shard {args.shard_id}/{args.num_shards} with {len(dataset)} instances.")
else:
print(f"Found {len(dataset)} instances.")

temp_sif = args.temp_sif

# Ensure directory for temp file exists
os.makedirs(os.path.dirname(temp_sif), exist_ok=True)

for i, instance in enumerate(dataset):
instance_id = instance["instance_id"]
# SWE-Gym format: xingyaoww/sweb.eval.x86_64.{instance_id_with_underscores}
# Replace "__" with "_s_" in instance_id
transformed_id = instance_id.replace("__", "_s_").lower()
image_uri = f"docker://xingyaoww/sweb.eval.x86_64.{transformed_id}"

print(f"[{i+1}/{len(dataset)}] Pulling {image_uri}...")
try:
# Pull to a temporary file to populate the cache
# We can overwrite the same file since we are running sequentially
subprocess.run(["apptainer", "pull", "--force", temp_sif, image_uri], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to pull {image_uri}: {e}", file=sys.stderr)
except KeyboardInterrupt:
print("\nInterrupted by user.")
break

# Cleanup
if os.path.exists(temp_sif):
try:
os.remove(temp_sif)
except OSError:
pass

print("Done.")

if __name__ == "__main__":
main()

32 changes: 32 additions & 0 deletions scripts/pull_swegym_images_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
#SBATCH --job-name=pull-swegym-images
#SBATCH --output=logs/pull_swegym_%A_%a.out
#SBATCH --error=logs/pull_swegym_%A_%a.err
#SBATCH --nodes=1
#SBATCH -C "thin"
#SBATCH --gpus 1
#SBATCH --time=06:00:00
#SBATCH --array=0-19

set -euo pipefail

# Configuration
DATASET="SWE-Gym/SWE-Gym"
SPLIT="train"
TOTAL_WORKERS=20
WORKER_ID=${SLURM_ARRAY_TASK_ID:-0}

echo "Starting worker ${WORKER_ID}/${TOTAL_WORKERS} for SWE-Gym ${DATASET}..."

# Use a unique temp file for this job to avoid conflicts
TEMP_SIF="/proj/berzelius-2024-336/users/x_andaf/CodeRepairRL/temp_pull_swegym_${SLURM_JOB_ID}_${WORKER_ID}.sif"

uv run scripts/pull_swegym_images.py \
--dataset "$DATASET" \
--split "$SPLIT" \
--temp-sif "$TEMP_SIF" \
--shard-id "$WORKER_ID" \
--num-shards "$TOTAL_WORKERS"

echo "Worker ${WORKER_ID} finished."

148 changes: 131 additions & 17 deletions src/agents/nano_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,24 @@
logger = logging.getLogger(__name__)


def setup_env_common(env: Environment):
"""
Common setup steps that apply to all datasets.
This includes installing system packages and setting up git for commits.
"""
# Install ripgrep
env.run_shell("apt-get update && apt-get install -y ripgrep 2>/dev/null || true")

# Commit all changes to ensure we have a clean state
env.run_shell("git config --global user.email 'you@example.com'")
env.run_shell("git config --global user.name 'Your Name'")
env.run_shell("git add . && git commit -m 'add changes'")


def setup_env_swebench(env: Environment):
"""
Mimics the R2E-Gym setup function with both swebench and non-swebench paths.
Detects which type of image we're using and applies the appropriate setup.
SWE-bench specific environment setup.
Includes PATH configuration, conda env setup, and package installation.
"""
repo_path = "/testbed"
alt_path = "/root"
Expand Down Expand Up @@ -63,13 +77,82 @@ def setup_env_swebench(env: Environment):
# We will not use this script in nano-agent
# env.run_shell(f"ln -sf {alt_path}/r2e_tests {repo_path}/r2e_tests 2>/dev/null || true")

# Install ripgrep
env.run_shell("apt-get update && apt-get install -y ripgrep 2>/dev/null || true")
# Call common setup
setup_env_common(env)


def setup_env_swegym(env: Environment):
"""
SWE-Gym specific environment setup.
Only calls the common setup since SWE-Gym images are pre-configured.
"""
setup_env_common(env)


# Supported HuggingFace dataset names
SUPPORTED_DATASETS = {
"princeton-nlp/SWE-bench_Verified",
"SWE-Gym/SWE-Gym",
"SWE-Gym/SWE-Gym-Lite",
}


def _is_swegym_dataset(dataset_name: str) -> bool:
"""Check if the dataset name is a SWE-Gym dataset."""
return dataset_name.startswith("SWE-Gym/")


def _get_setup_fn(dataset_name: str):
"""
Get the appropriate setup function for the given dataset.

Args:
dataset_name: HuggingFace dataset name

Returns:
Setup function to use for the environment
"""
if _is_swegym_dataset(dataset_name):
return setup_env_swegym
elif dataset_name.startswith("princeton-nlp/SWE-bench"):
return setup_env_swebench
else:
raise ValueError(
f"Unsupported dataset: {dataset_name}. "
f"Supported datasets are: {', '.join(sorted(SUPPORTED_DATASETS))}"
)


def _construct_image_name(instance_id: str, dataset_name: str) -> str:
"""
Construct the Docker image name for Apptainer backend based on dataset type.

Args:
instance_id: The instance identifier from the dataset
dataset_name: HuggingFace dataset name (e.g., "princeton-nlp/SWE-bench_Verified" or "SWE-Gym/SWE-Gym")

Returns:
Full Docker image name (e.g., "docker.io/slimshetty/swebench-verified:sweb.eval.x86_64.{instance_id}")

Raises:
ValueError: If dataset_name is not one of the supported datasets
"""
if _is_swegym_dataset(dataset_name):
# SWE-Gym format: xingyaoww/sweb.eval.x86_64.{instance_id_with_underscores}
# Replace "__" with "_s_" in instance_id
transformed_id = instance_id.replace("__", "_s_").lower()
image_name = f"xingyaoww/sweb.eval.x86_64.{transformed_id}"
elif dataset_name.startswith("princeton-nlp/SWE-bench"):
# SWE-bench format: docker.io/slimshetty/swebench-verified:sweb.eval.x86_64.{instance_id}
image_name = f"docker.io/slimshetty/swebench-verified:sweb.eval.x86_64.{instance_id}"
else:
raise ValueError(
f"Unsupported dataset: {dataset_name}. "
f"Supported datasets are: {', '.join(sorted(SUPPORTED_DATASETS))}"
)

return image_name

# Commit all changes to ensure we have a clean state
env.run_shell("git config --global user.email 'you@example.com'")
env.run_shell("git config --global user.name 'Your Name'")
env.run_shell("git add . && git commit -m 'add changes'")

@dataclass
class NanoConfig:
Expand All @@ -90,7 +173,7 @@ class NanoConfig:
env: Optional[Any] = None


def _process_one(data: dict[str, Any], config: NanoConfig) -> dict[str, Any]:
def _process_one(data: dict[str, Any], config: NanoConfig, dataset_name: Optional[str] = None) -> dict[str, Any]:
assert "repo" in data and "base_commit" in data and "problem_statement" in data

logger.info(f"[START] {data['repo']} @ {data['base_commit'][:7]}")
Expand All @@ -111,9 +194,16 @@ def _process_one(data: dict[str, Any], config: NanoConfig) -> dict[str, Any]:
# Based on the reference, the caller (run_nano_eval) should likely construct the environment.
# However, `_process_one` is called per instance, and the environment depends on the instance ID.
instance_id = data.get("instance_id")
image_name = f"docker.io/slimshetty/swebench-verified:sweb.eval.x86_64.{instance_id}"
if not instance_id:
raise ValueError("instance_id is required when using apptainer backend")

if not dataset_name:
raise ValueError("dataset_name is required when using apptainer backend")

image_name = _construct_image_name(instance_id, dataset_name)
workdir = "/testbed"
env = ApptainerEnvironment(image=f"docker://{image_name}", workdir=workdir, setup_fn=setup_env_swebench)
setup_fn = _get_setup_fn(dataset_name)
env = ApptainerEnvironment(image=f"docker://{image_name}", workdir=workdir, setup_fn=setup_fn)
agent_kwargs["env"] = env
elif env:
agent_kwargs["env"] = env
Expand Down Expand Up @@ -180,14 +270,38 @@ def _process_one(data: dict[str, Any], config: NanoConfig) -> dict[str, Any]:
return result


def nano_rollout_func(data: list[dict[str, Any]], config: NanoConfig, **kwargs) -> list[dict[str, Any]]:
"""Deploys parallel Nano agents talking to our trl vllm-serve-async endpoint to process the given data"""
def nano_rollout_func(data: list[dict[str, Any]], config: NanoConfig, dataset_name: Optional[str] = None, **kwargs) -> list[dict[str, Any]]:
"""
Deploys parallel Nano agents talking to our trl vllm-serve-async endpoint to process the given data.

Args:
data: List of data dictionaries, each containing instance information
config: NanoConfig with agent configuration
dataset_name: HuggingFace dataset name (e.g., "princeton-nlp/SWE-bench_Verified" or "SWE-Gym/SWE-Gym").
Required when using apptainer backend. Must be one of the supported datasets.
**kwargs: Additional keyword arguments (ignored)

Returns:
List of result dictionaries, one per input data item

Raises:
ValueError: If dataset_name is not one of the supported datasets
"""
# Validate dataset_name if provided or if using apptainer backend
if dataset_name:
if not (dataset_name.startswith("princeton-nlp/SWE-bench") or _is_swegym_dataset(dataset_name)):
raise ValueError(
f"Unsupported dataset: {dataset_name}. "
f"Supported datasets are: {', '.join(sorted(SUPPORTED_DATASETS))}"
)
elif config.backend == "apptainer":
raise ValueError("dataset_name is required when using apptainer backend")

logger.info(f"Starting {len(data)} agent rollouts")
logger.info(f"Starting {len(data)} agent rollouts" + (f" with dataset {dataset_name}" if dataset_name else ""))
start_time = time.time()

with ThreadPoolExecutor(max_workers=min(len(data), os.cpu_count())) as executor:
results = list(executor.map(lambda datum: _process_one(datum, config), data))
results = list(executor.map(lambda datum: _process_one(datum, config, dataset_name), data))

logger.info(f"Finished {len(data)} rollouts in {time.time() - start_time:.2f}s")
return results
Expand All @@ -203,7 +317,7 @@ def nano_rollout_func(data: list[dict[str, Any]], config: NanoConfig, **kwargs)
runs = 1
data = get_swe_gym_repo_repair_dataset().shuffle(seed=42)

config = NanoConfig(model="hosted_vllm/Qwen/Qwen3-8B")
config = NanoConfig(model="hosted_vllm/Qwen/Qwen3-8B", backend="apptainer")

avg_times = []

Expand All @@ -214,7 +328,7 @@ def nano_rollout_func(data: list[dict[str, Any]], config: NanoConfig, **kwargs)
times = []
for i in range(runs):
start_time = time.time()
results = nano_rollout_func(subset_dicts, config)
results = nano_rollout_func(subset_dicts, config, dataset_name="SWE-Gym/SWE-Gym")
elapsed = time.time() - start_time
times.append(elapsed)
print(f" Run {i+1}: {elapsed:.2f}s")
Expand Down
1 change: 1 addition & 0 deletions src/conf/agent/nano.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ min_p: null
top_k: null
verbose: false
log: False
backend: "apptainer"
6 changes: 3 additions & 3 deletions src/curate_sft_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ class Config:
cs = ConfigStore.instance()
cs.store(name="base_curation_config", node=Config, group="")

def process_one_with_reward(problem_data: dict[str, Any], config: NanoConfig) -> dict[str, Any]:
def process_one_with_reward(problem_data: dict[str, Any], config: NanoConfig, dataset_name: str) -> dict[str, Any]:
"""
Helper function that wraps _process_one and calculates reward.
"""
result = _process_one(problem_data, config)
result = _process_one(problem_data, config, dataset_name)

# Calculate rewards using same approach as train_grpo
generated_diff = result["generated_diff"]
Expand Down Expand Up @@ -122,7 +122,7 @@ def main(cfg: Config) -> None:
agent_config = NanoConfig(**OmegaConf.to_container(cfg.agent, resolve=True))

with ThreadPoolExecutor(max_workers=cfg.curation.max_workers) as executor:
futures = [executor.submit(process_one_with_reward, task, agent_config) for task in all_rollout_tasks]
futures = [executor.submit(process_one_with_reward, task, agent_config, cfg.curation.input_dataset_name) for task in all_rollout_tasks]

for future in as_completed(futures):
completed_count += 1
Expand Down
5 changes: 4 additions & 1 deletion src/train_grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,10 @@ def main(cfg: Config) -> None:
# Convert OmegaConf to NanoConfig dataclass
agent_config = AgentConfig(**OmegaConf.to_container(cfg.agent, resolve=True))
if agent_config.agent_kind == "nano":
rollout_func = partial(nano_rollout_func, config=agent_config)
# Extract dataset_name from config for nano_rollout_func
# FIXME: the multilingual mixed config/dataset is not supported yet with the apptainer backend and will throw an error if used
dataset_name = cfg.run.dataset_name
rollout_func = partial(nano_rollout_func, config=agent_config, dataset_name=dataset_name)
elif agent_config.agent_kind == "mini":
register_model({
f"{cfg.model.model_name}": {
Expand Down