diff --git a/pyproject.toml b/pyproject.toml index f0953afbc..81acfc07d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ dependencies = [ "tomli; python_version < '3.11'", "prime-sandboxes>=0.1.0", "wget>=3.2", + "torch>=2.8.0", ] [dependency-groups] diff --git a/verifiers/envs/env_group.py b/verifiers/envs/env_group.py index 9ab673fa6..db60b2a48 100644 --- a/verifiers/envs/env_group.py +++ b/verifiers/envs/env_group.py @@ -84,6 +84,88 @@ async def score_rollout( return RolloutScore(reward=reward, metrics=metrics) +class EnvGroupSparseRubric(EnvGroupRubric): + """ + enhanced EnvGroup rubric with domain-specific sparse tracking. + + this rubric extends EnvGroupRubric to support sparse metrics for multi-domain environments. + when routing scoring to domain-specific environments, it automatically marks metrics + that weren't computed by the target environment as sparse (excluded from averaging). + + Key differences from standard EnvGroupRubric: + - marks uncomputed domain metrics as sparse (e.g., chemistry_reward=0.0 becomes sparse) + - enables mathematically correct domain averaging by excluding irrelevant zeros + - Only used when EnvGroup is initialized with enable_sparse_metrics=True + + Example: For a chemistry task in ProfBench, physics/finance/consulting rewards are marked + sparse, ensuring chemistry_reward averages only over actual chemistry evaluations. + """ + + async def score_rollout( + self, + prompt: str | list[ChatMessage], + completion: str | list[ChatMessage], + answer: str = "", + state: State | None = None, + task: str = "default", + info: dict | None = None, + example_id: int | None = None, + **kwargs, + ) -> RolloutScore: + """ + Route scoring with sparse metrics support for multi-domain environments. + + This method handles scoring by: + 1. Routing the task to the appropriate domain-specific environment + 2. Computing metrics using that environment's rubric + 3. Filling uncomputed metrics with 0.0 and marking them as sparse + 4. Returning results with sparse flags for proper averaging + + Only used when EnvGroup has enable_sparse_metrics=True. + """ + state = state or {} + info = info or {} + + # pre-initialize all known metrics to 0.0 + # this ensures consistent metric structure across all rollouts + # uncomputed metrics will remain 0.0 and be marked sparse + metrics = {name: 0.0 for name in self.all_reward_names} + reward = 0.0 + + # Route to appropriate domain environment based on task + env = self.env_map.get(task) + if env is None: + self.logger.warning(f"No environment found for task '{task}'") + return RolloutScore(reward=reward, metrics=metrics) + + # Score using the domain-specific environment's rubric + # this computes only the metrics relevant to this domain + env_results = await env.rubric.score_rollout( + prompt, completion, answer, state, task, info, example_id, **kwargs + ) + + # update metrics with computed values from domain environment + # metrics not computed by this environment remain at 0.0 + for reward_name, score in env_results.metrics.items(): + if reward_name in metrics: + metrics[reward_name] = score + + # mark uncomputed metrics as sparse for exclusion from averaging + # example: for chemistry task, physics/finance/consulting rewards marked sparse + # this enables mathematically correct domain averaging + uncomputed_metrics = set(self.all_reward_names) - set( + env_results.metrics.keys() + ) + sparse_metrics = uncomputed_metrics if uncomputed_metrics else None + + # Overall reward comes from the domain environment + reward = env_results.reward + + return RolloutScore( + reward=reward, metrics=metrics, sparse_metrics=sparse_metrics + ) + + class EnvGroup(Environment): """ Environment group that acts as a mixture of multiple environments. @@ -92,7 +174,11 @@ class EnvGroup(Environment): """ def __init__( - self, envs: list[Environment], env_names: list[str] | None = None, **kwargs + self, + envs: list[Environment], + env_names: list[str] | None = None, + enable_sparse_metrics: bool = False, + **kwargs, ): """ Initialize EnvGroup with a list of environments. @@ -101,6 +187,7 @@ def __init__( envs: list of Environment instances env_names: Optional list of names for each environment. If not provided, uses "env_0", "env_1", etc. + enable_sparse_metrics: Enable sparse metrics for mathematically correct domain averaging **kwargs: Additional arguments passed to parent Environment """ if not envs: @@ -134,10 +221,18 @@ def add_task(example): eval_datasets.append(env_eval_dataset) dataset = concatenate_datasets(datasets) if datasets else None eval_dataset = concatenate_datasets(eval_datasets) if eval_datasets else None - # wrap rubrics - rubric = EnvGroupRubric(self.env_map) - - # Don't set oai_tools at the group level since different sub-environments + # choose rubric type based on enable_sparse_metrics flag + # this is the key decision point for sparse metrics activation + if enable_sparse_metrics: + # use sparse-aware rubric that marks uncomputed domain metrics as sparse + # enables mathematically correct averaging by excluding irrelevant zeros + rubric = EnvGroupSparseRubric(self.env_map) + else: + # use standard rubric that includes all values in averaging (backwards compatible) + # this preserves existing behavior for environments without sparse metrics + rubric = EnvGroupRubric(self.env_map) + + # don't set oai_tools at the group level since different sub-environments # may have different tools. Instead, set them per-task in rollout(). # initialize parent Environment super().__init__( diff --git a/verifiers/envs/environment.py b/verifiers/envs/environment.py index 9e9e39158..c68b7ad5b 100644 --- a/verifiers/envs/environment.py +++ b/verifiers/envs/environment.py @@ -605,6 +605,14 @@ async def generate( if interleave_scoring and score_rollouts: # interleaved pipeline: separate semaphores for generation and scoring # pre-allocate metrics using known reward function names + reward_func_names = self.rubric.get_reward_func_names() + sparse_flags: dict[str, list[bool]] = { + name: [False] * n for name in reward_func_names + } + # ^^ initialize sparse tracking flags for each metric + # sparse_flags tracks which rollout values should be excluded from averaging + # Initially all values are marked as relevant (False = not sparse) + # Rubrics can mark specific rollouts as sparse during scoring maybe_gen_sem = generation_semaphore or ( semaphore or await maybe_semaphore(gen_limit) ) @@ -658,7 +666,19 @@ async def run_one(i: int) -> None: # ensure key exists in case of EnvGroup/RubricGroup if k not in results.metrics: results.metrics[k] = [0.0] * n + sparse_flags[k] = [False] * n + # ^^ initialize sparse flags for dynamically discovered metrics results.metrics[k][i] = v + + # process sparse metric flags from rubric scoring + # when a rubric marks certain metrics as sparse for this rollout, + # we set the corresponding sparse flags to True to exclude them from averaging + if rs.sparse_metrics: + for sparse_key in rs.sparse_metrics: + if sparse_key not in sparse_flags: + # handle metrics marked sparse that weren't pre-allocated + sparse_flags[sparse_key] = [False] * n + sparse_flags[sparse_key][i] = True num_completed += 1 if save_every > 0 and num_completed % save_every == 0: self.logger.debug(f"Saving results to {results_path}") @@ -718,6 +738,12 @@ async def run_one(i: int) -> None: ) results.reward = rollout_scores.reward results.metrics = rollout_scores.metrics + # pass through sparse_metrics if present + if ( + hasattr(rollout_scores, "sparse_metrics") + and rollout_scores.sparse_metrics + ): + results.sparse_metrics = rollout_scores.sparse_metrics else: results.reward = [] results.metrics = {} @@ -739,6 +765,19 @@ async def run_one(i: int) -> None: results.metadata.avg_reward = avg_reward results.metadata.avg_metrics = avg_metrics + # conditionally add sparse tracking to results + # only include sparse_metrics if: + # 1. We're using interleaved scoring (where sparse tracking occurs) + # 2. Score rollouts is enabled (metrics are being computed) + # 3. At least one metric has sparse values (maintains backwards compatibility) + # this ensures existing environments without sparse metrics remain unchanged + if ( + interleave_scoring + and score_rollouts + and any(any(flags) for flags in sparse_flags.values()) + ): + results.sparse_metrics = sparse_flags + return results # alias for backward compatibility diff --git a/verifiers/rubrics/rubric.py b/verifiers/rubrics/rubric.py index 1fd0220b7..c0d08f789 100644 --- a/verifiers/rubrics/rubric.py +++ b/verifiers/rubrics/rubric.py @@ -262,11 +262,47 @@ async def score_rollouts( return RolloutScores( reward=[], metrics={name: [] for name in reward_func_names}, + # return sparse tracking only if needed (this is backwrds compatible) + sparse_metrics={name: [] for name in reward_func_names} + if any(r.sparse_metrics for r in rewards if r.sparse_metrics) + else None, ) + # collect all possible metric keys across all rollouts + # this handles cases where different rollouts may have different metrics + # (e.g., multi-domain environments where some metrics don't apply to all tasks) + all_metric_keys = set() + for reward in rewards: + all_metric_keys.update(reward.metrics.keys()) + + # build unified metrics dict with sparse tracking + # ensures all metric keys are present in all rollout results, filling missing + # values with 0.0 and marking them as sparse (excluded from averaging) + metrics = {} + sparse_flags = {} + for k in all_metric_keys: + metrics[k] = [] + sparse_flags[k] = [] + + for reward in rewards: + if k in reward.metrics: + # metric computed for this rollout - include the actual value + metrics[k].append(reward.metrics[k]) + # check if rubric marked this metric as sparse for this rollout + is_sparse = reward.sparse_metrics and k in reward.sparse_metrics + sparse_flags[k].append(is_sparse) + else: + # metric not computed for this rollout - fill with sparse 0.0 + # this handles domain-specific metrics that don't apply to all tasks + metrics[k].append(0.0) + sparse_flags[k].append(True) + return RolloutScores( reward=[reward.reward for reward in rewards], - metrics={ - k: [item.metrics[k] for item in rewards] for k in rewards[0].metrics - }, + metrics=metrics, + # only include sparse_metrics if at least one metric has sparse values + # this maintains backwards compatibility - environments without sparse metrics get None + sparse_metrics=sparse_flags + if any(any(flags) for flags in sparse_flags.values()) + else None, ) diff --git a/verifiers/types.py b/verifiers/types.py index daa3eb780..22b9bb7b2 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -86,6 +86,13 @@ class GenerateOutputs(BaseModel): reward: list[float] metrics: dict[str, list[float]] = Field(default_factory=dict) metadata: GenerateMetadata + sparse_metrics: dict[str, list[bool]] | None = Field(default=None) + # ^^ pptional sparse tracking for multi-domain environments + # When present, sparse_metrics[metric_name] indicates which rollout values should be + # excluded from averaging (e.g., domain-specific metrics evaluated on irrelevant tasks). + # True = sparse (exclude from average), False = relevant (include in average) + # Example: chemistry_reward=[50.0, 0.0, 75.0] with sparse_metrics={"chemistry_reward": [False, True, False]} + # would average to 62.5 instead of 41.7, excluding the irrelevant 0.0 score. class RolloutScore(BaseModel): @@ -93,6 +100,10 @@ class RolloutScore(BaseModel): reward: float metrics: dict[str, float] = Field(default_factory=dict) + sparse_metrics: set[str] | None = Field(default=None) + # ^^ set of metric names that should be excluded from averaging for this rollout + # Used by rubrics to mark domain-specific metrics as irrelevant for certain tasks + # Example: {"chemistry_reward", "physics_reward"} when evaluating a finance task class RolloutScores(BaseModel): @@ -100,6 +111,10 @@ class RolloutScores(BaseModel): reward: list[float] metrics: dict[str, list[float]] = Field(default_factory=dict) + sparse_metrics: dict[str, list[bool]] | None = Field(default=None) + # ^^ per-rollout exclusion flags for batch scoring + # Maps metric names to lists of boolean flags (True = sparse, False = relevant) + # Length matches the rollout lists in reward/metrics. Aggregated from individual RolloutScore.sparse_metrics class ProcessedOutputs(BaseModel): diff --git a/verifiers/utils/eval_utils.py b/verifiers/utils/eval_utils.py index 3c2c18d38..7211e6ec8 100644 --- a/verifiers/utils/eval_utils.py +++ b/verifiers/utils/eval_utils.py @@ -89,10 +89,62 @@ def print_results(results: GenerateOutputs, num_samples: int = 1): print(out) for k in results.metrics: v = results.metrics[k] - print(f"{k}: avg - {sum(v) / len(v):.3f}, std - {np.std(v):.3f}") + + # selective averaging that excludes sparse values + # only average over relevant (non-sparse) values + # instead of including misleading zeros in the calculation + if ( + hasattr(results, "sparse_metrics") + and results.sparse_metrics + and k in results.sparse_metrics + ): + # filter out sparse values from averaging calculation + # sparse_flags[i] = True means exclude rollout i from averaging + sparse_flags = results.sparse_metrics[k] + relevant_values = [ + val for val, is_sparse in zip(v, sparse_flags) if not is_sparse + ] + + if relevant_values: + # calculate statistics over only the relevant (non-sparse) values + # this gives mathematically correct domain-specific averages + avg = sum(relevant_values) / len(relevant_values) + std = np.std(relevant_values) + sparsity_info = f" (relevant: {len(relevant_values)}/{len(v)})" + print(f"{k}: avg - {avg:.3f}, std - {std:.3f}{sparsity_info}") + else: + # all values marked sparse - no relevant data to average + print(f"{k}: no relevant data (all values sparse)") + else: + # standard averaging for non-sparse metrics (backwards compatible) + # this preserves existing behavior for environments without sparse metrics + print(f"{k}: avg - {sum(v) / len(v):.3f}, std - {np.std(v):.3f}") + + # enhanced rollout display that shows sparsity clearly + # Instead of showing misleading 0.0 values, display "-" for sparse metrics + # This makes it immediately obvious which rollouts are relevant vs excluded for i in range(r): - # rounded to 3 decimal places - trials = [round(v[(i * n) + j], 3) for j in range(n)] + if ( + hasattr(results, "sparse_metrics") + and results.sparse_metrics + and k in results.sparse_metrics + ): + # For sparse metrics: "-" indicates sparse (irrelevant), numbers show actual values + # This visual distinction prevents confusion about which values contribute to averages + sparse_flags = results.sparse_metrics[k] + trials = [] + for j in range(n): + idx = (i * n) + j + if sparse_flags[idx]: + # sparse value - show "-" instead of 0.0 to indicate exclusion from averaging + trials.append("-") + else: + # non-sparse value - show actual computed score + trials.append(round(v[idx], 3)) + else: + # standard rollout printing for non-sparse metrics (backwards compatible) + # all values shown as numbers since none are excluded from averaging + trials = [round(v[(i * n) + j], 3) for j in range(n)] out = f"r{i + 1}: {trials}" print(out)