Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies = [
"tomli; python_version < '3.11'",
"prime-sandboxes>=0.1.0",
"wget>=3.2",
"torch>=2.8.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob leftover from smth else?

]

[dependency-groups]
Expand Down
105 changes: 100 additions & 5 deletions verifiers/envs/env_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,88 @@ async def score_rollout(
return RolloutScore(reward=reward, metrics=metrics)


class EnvGroupSparseRubric(EnvGroupRubric):
"""
enhanced EnvGroup rubric with domain-specific sparse tracking.

this rubric extends EnvGroupRubric to support sparse metrics for multi-domain environments.
when routing scoring to domain-specific environments, it automatically marks metrics
that weren't computed by the target environment as sparse (excluded from averaging).

Key differences from standard EnvGroupRubric:
- marks uncomputed domain metrics as sparse (e.g., chemistry_reward=0.0 becomes sparse)
- enables mathematically correct domain averaging by excluding irrelevant zeros
- Only used when EnvGroup is initialized with enable_sparse_metrics=True

Example: For a chemistry task in ProfBench, physics/finance/consulting rewards are marked
sparse, ensuring chemistry_reward averages only over actual chemistry evaluations.
"""

async def score_rollout(
self,
prompt: str | list[ChatMessage],
completion: str | list[ChatMessage],
answer: str = "",
state: State | None = None,
task: str = "default",
info: dict | None = None,
example_id: int | None = None,
**kwargs,
) -> RolloutScore:
"""
Route scoring with sparse metrics support for multi-domain environments.

This method handles scoring by:
1. Routing the task to the appropriate domain-specific environment
2. Computing metrics using that environment's rubric
3. Filling uncomputed metrics with 0.0 and marking them as sparse
4. Returning results with sparse flags for proper averaging

Only used when EnvGroup has enable_sparse_metrics=True.
"""
state = state or {}
info = info or {}

# pre-initialize all known metrics to 0.0
# this ensures consistent metric structure across all rollouts
# uncomputed metrics will remain 0.0 and be marked sparse
metrics = {name: 0.0 for name in self.all_reward_names}
reward = 0.0

# Route to appropriate domain environment based on task
env = self.env_map.get(task)
if env is None:
self.logger.warning(f"No environment found for task '{task}'")
return RolloutScore(reward=reward, metrics=metrics)

# Score using the domain-specific environment's rubric
# this computes only the metrics relevant to this domain
env_results = await env.rubric.score_rollout(
prompt, completion, answer, state, task, info, example_id, **kwargs
)

# update metrics with computed values from domain environment
# metrics not computed by this environment remain at 0.0
for reward_name, score in env_results.metrics.items():
if reward_name in metrics:
metrics[reward_name] = score

# mark uncomputed metrics as sparse for exclusion from averaging
# example: for chemistry task, physics/finance/consulting rewards marked sparse
# this enables mathematically correct domain averaging
uncomputed_metrics = set(self.all_reward_names) - set(
env_results.metrics.keys()
)
sparse_metrics = uncomputed_metrics if uncomputed_metrics else None

# Overall reward comes from the domain environment
reward = env_results.reward

return RolloutScore(
reward=reward, metrics=metrics, sparse_metrics=sparse_metrics
)


class EnvGroup(Environment):
"""
Environment group that acts as a mixture of multiple environments.
Expand All @@ -92,7 +174,11 @@ class EnvGroup(Environment):
"""

def __init__(
self, envs: list[Environment], env_names: list[str] | None = None, **kwargs
self,
envs: list[Environment],
env_names: list[str] | None = None,
enable_sparse_metrics: bool = False,
**kwargs,
):
"""
Initialize EnvGroup with a list of environments.
Expand All @@ -101,6 +187,7 @@ def __init__(
envs: list of Environment instances
env_names: Optional list of names for each environment.
If not provided, uses "env_0", "env_1", etc.
enable_sparse_metrics: Enable sparse metrics for mathematically correct domain averaging
**kwargs: Additional arguments passed to parent Environment
"""
if not envs:
Expand Down Expand Up @@ -134,10 +221,18 @@ def add_task(example):
eval_datasets.append(env_eval_dataset)
dataset = concatenate_datasets(datasets) if datasets else None
eval_dataset = concatenate_datasets(eval_datasets) if eval_datasets else None
# wrap rubrics
rubric = EnvGroupRubric(self.env_map)

# Don't set oai_tools at the group level since different sub-environments
# choose rubric type based on enable_sparse_metrics flag
# this is the key decision point for sparse metrics activation
if enable_sparse_metrics:
# use sparse-aware rubric that marks uncomputed domain metrics as sparse
# enables mathematically correct averaging by excluding irrelevant zeros
rubric = EnvGroupSparseRubric(self.env_map)
else:
# use standard rubric that includes all values in averaging (backwards compatible)
# this preserves existing behavior for environments without sparse metrics
rubric = EnvGroupRubric(self.env_map)

# don't set oai_tools at the group level since different sub-environments
# may have different tools. Instead, set them per-task in rollout().
# initialize parent Environment
super().__init__(
Expand Down
39 changes: 39 additions & 0 deletions verifiers/envs/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,14 @@ async def generate(
if interleave_scoring and score_rollouts:
# interleaved pipeline: separate semaphores for generation and scoring
# pre-allocate metrics using known reward function names
reward_func_names = self.rubric.get_reward_func_names()
sparse_flags: dict[str, list[bool]] = {
name: [False] * n for name in reward_func_names
}
# ^^ initialize sparse tracking flags for each metric
# sparse_flags tracks which rollout values should be excluded from averaging
# Initially all values are marked as relevant (False = not sparse)
# Rubrics can mark specific rollouts as sparse during scoring
maybe_gen_sem = generation_semaphore or (
semaphore or await maybe_semaphore(gen_limit)
)
Expand Down Expand Up @@ -658,7 +666,19 @@ async def run_one(i: int) -> None:
# ensure key exists in case of EnvGroup/RubricGroup
if k not in results.metrics:
results.metrics[k] = [0.0] * n
sparse_flags[k] = [False] * n
# ^^ initialize sparse flags for dynamically discovered metrics
results.metrics[k][i] = v

# process sparse metric flags from rubric scoring
# when a rubric marks certain metrics as sparse for this rollout,
# we set the corresponding sparse flags to True to exclude them from averaging
if rs.sparse_metrics:
for sparse_key in rs.sparse_metrics:
if sparse_key not in sparse_flags:
# handle metrics marked sparse that weren't pre-allocated
sparse_flags[sparse_key] = [False] * n
sparse_flags[sparse_key][i] = True
num_completed += 1
if save_every > 0 and num_completed % save_every == 0:
self.logger.debug(f"Saving results to {results_path}")
Expand Down Expand Up @@ -718,6 +738,12 @@ async def run_one(i: int) -> None:
)
results.reward = rollout_scores.reward
results.metrics = rollout_scores.metrics
# pass through sparse_metrics if present
if (
hasattr(rollout_scores, "sparse_metrics")
and rollout_scores.sparse_metrics
):
results.sparse_metrics = rollout_scores.sparse_metrics
else:
results.reward = []
results.metrics = {}
Expand All @@ -739,6 +765,19 @@ async def run_one(i: int) -> None:
results.metadata.avg_reward = avg_reward
results.metadata.avg_metrics = avg_metrics

# conditionally add sparse tracking to results
# only include sparse_metrics if:
# 1. We're using interleaved scoring (where sparse tracking occurs)
# 2. Score rollouts is enabled (metrics are being computed)
# 3. At least one metric has sparse values (maintains backwards compatibility)
# this ensures existing environments without sparse metrics remain unchanged
if (
interleave_scoring
and score_rollouts
and any(any(flags) for flags in sparse_flags.values())
):
results.sparse_metrics = sparse_flags

return results

# alias for backward compatibility
Expand Down
42 changes: 39 additions & 3 deletions verifiers/rubrics/rubric.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,47 @@ async def score_rollouts(
return RolloutScores(
reward=[],
metrics={name: [] for name in reward_func_names},
# return sparse tracking only if needed (this is backwrds compatible)
sparse_metrics={name: [] for name in reward_func_names}
if any(r.sparse_metrics for r in rewards if r.sparse_metrics)
else None,
)

# collect all possible metric keys across all rollouts
# this handles cases where different rollouts may have different metrics
# (e.g., multi-domain environments where some metrics don't apply to all tasks)
all_metric_keys = set()
for reward in rewards:
all_metric_keys.update(reward.metrics.keys())

# build unified metrics dict with sparse tracking
# ensures all metric keys are present in all rollout results, filling missing
# values with 0.0 and marking them as sparse (excluded from averaging)
metrics = {}
sparse_flags = {}
for k in all_metric_keys:
metrics[k] = []
sparse_flags[k] = []

for reward in rewards:
if k in reward.metrics:
# metric computed for this rollout - include the actual value
metrics[k].append(reward.metrics[k])
# check if rubric marked this metric as sparse for this rollout
is_sparse = reward.sparse_metrics and k in reward.sparse_metrics
sparse_flags[k].append(is_sparse)
else:
# metric not computed for this rollout - fill with sparse 0.0
# this handles domain-specific metrics that don't apply to all tasks
metrics[k].append(0.0)
sparse_flags[k].append(True)

return RolloutScores(
reward=[reward.reward for reward in rewards],
metrics={
k: [item.metrics[k] for item in rewards] for k in rewards[0].metrics
},
metrics=metrics,
# only include sparse_metrics if at least one metric has sparse values
# this maintains backwards compatibility - environments without sparse metrics get None
sparse_metrics=sparse_flags
if any(any(flags) for flags in sparse_flags.values())
else None,
)
15 changes: 15 additions & 0 deletions verifiers/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,35 @@ class GenerateOutputs(BaseModel):
reward: list[float]
metrics: dict[str, list[float]] = Field(default_factory=dict)
metadata: GenerateMetadata
sparse_metrics: dict[str, list[bool]] | None = Field(default=None)
# ^^ pptional sparse tracking for multi-domain environments
# When present, sparse_metrics[metric_name] indicates which rollout values should be
# excluded from averaging (e.g., domain-specific metrics evaluated on irrelevant tasks).
# True = sparse (exclude from average), False = relevant (include in average)
# Example: chemistry_reward=[50.0, 0.0, 75.0] with sparse_metrics={"chemistry_reward": [False, True, False]}
# would average to 62.5 instead of 41.7, excluding the irrelevant 0.0 score.


class RolloutScore(BaseModel):
"""Pydantic model for rollout scores."""

reward: float
metrics: dict[str, float] = Field(default_factory=dict)
sparse_metrics: set[str] | None = Field(default=None)
# ^^ set of metric names that should be excluded from averaging for this rollout
# Used by rubrics to mark domain-specific metrics as irrelevant for certain tasks
# Example: {"chemistry_reward", "physics_reward"} when evaluating a finance task


class RolloutScores(BaseModel):
"""Pydantic model for rubric outputs."""

reward: list[float]
metrics: dict[str, list[float]] = Field(default_factory=dict)
sparse_metrics: dict[str, list[bool]] | None = Field(default=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont like the name sparse metrics here. to me, this implies that this is the actual float metrics after filtering. would prefer a name that is indicative of the fact that these are boolean flags, maybe smth like has_reward_fn (not sure)

# ^^ per-rollout exclusion flags for batch scoring
# Maps metric names to lists of boolean flags (True = sparse, False = relevant)
# Length matches the rollout lists in reward/metrics. Aggregated from individual RolloutScore.sparse_metrics


class ProcessedOutputs(BaseModel):
Expand Down
58 changes: 55 additions & 3 deletions verifiers/utils/eval_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,62 @@ def print_results(results: GenerateOutputs, num_samples: int = 1):
print(out)
for k in results.metrics:
v = results.metrics[k]
print(f"{k}: avg - {sum(v) / len(v):.3f}, std - {np.std(v):.3f}")

# selective averaging that excludes sparse values
# only average over relevant (non-sparse) values
# instead of including misleading zeros in the calculation
if (
hasattr(results, "sparse_metrics")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is always true?

and results.sparse_metrics
and k in results.sparse_metrics
):
# filter out sparse values from averaging calculation
# sparse_flags[i] = True means exclude rollout i from averaging
sparse_flags = results.sparse_metrics[k]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yea look here you call it sparse_flags as well haha, this is alr better than sparse_metrics

relevant_values = [
val for val, is_sparse in zip(v, sparse_flags) if not is_sparse
]

if relevant_values:
# calculate statistics over only the relevant (non-sparse) values
# this gives mathematically correct domain-specific averages
avg = sum(relevant_values) / len(relevant_values)
std = np.std(relevant_values)
sparsity_info = f" (relevant: {len(relevant_values)}/{len(v)})"
print(f"{k}: avg - {avg:.3f}, std - {std:.3f}{sparsity_info}")
else:
# all values marked sparse - no relevant data to average
print(f"{k}: no relevant data (all values sparse)")
else:
# standard averaging for non-sparse metrics (backwards compatible)
# this preserves existing behavior for environments without sparse metrics
print(f"{k}: avg - {sum(v) / len(v):.3f}, std - {np.std(v):.3f}")

# enhanced rollout display that shows sparsity clearly
# Instead of showing misleading 0.0 values, display "-" for sparse metrics
# This makes it immediately obvious which rollouts are relevant vs excluded
for i in range(r):
# rounded to 3 decimal places
trials = [round(v[(i * n) + j], 3) for j in range(n)]
if (
hasattr(results, "sparse_metrics")
and results.sparse_metrics
and k in results.sparse_metrics
):
# For sparse metrics: "-" indicates sparse (irrelevant), numbers show actual values
# This visual distinction prevents confusion about which values contribute to averages
sparse_flags = results.sparse_metrics[k]
trials = []
for j in range(n):
idx = (i * n) + j
if sparse_flags[idx]:
# sparse value - show "-" instead of 0.0 to indicate exclusion from averaging
trials.append("-")
else:
# non-sparse value - show actual computed score
trials.append(round(v[idx], 3))
else:
# standard rollout printing for non-sparse metrics (backwards compatible)
# all values shown as numbers since none are excluded from averaging
trials = [round(v[(i * n) + j], 3) for j in range(n)]
out = f"r{i + 1}: {trials}"
print(out)

Expand Down