Skip to content
200 changes: 200 additions & 0 deletions PR_DESCRIPTION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
## Description

This PR consolidates three separate callbacks (`on_start`, `on_progress`, `on_log`) into a unified event-based system, addressing issue #755. The new system uses TypedDict events with Literal discriminators for type-safe pattern matching, making it cleaner, more extensible, and easier to maintain.

### Motivation

The previous callback system had grown brittle with three separate callback parameters that felt "tailor-made" for specific use cases. This PR:
- ✅ Replaces 3 callbacks with 1 unified `on_event` handler
- ✅ Adds support for PR #632 (GroupCompleteEvent with State objects)
- ✅ Provides infrastructure for #753 (log streaming via LogStreamEvent)
- ✅ Makes the system more extensible for future event types

### Before / After

**Before:**
```python
await env.evaluate(
client=client,
model="gpt-4",
on_start=lambda total: ...,
on_progress=lambda all_outs, new_outs: ...,
on_log=lambda msg: ...
)
```

**After:**
```python
async def on_event(event: EvalEvent):
match event["type"]:
case "start":
print(f"Starting: {event['num_examples']} examples")
case "progress":
print(f"Progress: {event['completed_count']}/{event['total_count']}")
case "complete":
print(f"Done! Avg reward: {event['avg_reward']}")

await env.evaluate(
client=client,
model="gpt-4",
on_event=on_event # Single unified handler!
)
```

## Type of Change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Documentation update
- [ ] Test improvement

**Note:** This is a breaking change that removes `on_start`, `on_progress`, and `on_log` parameters. All internal usages have been migrated to the new event system.

## Changes

### New Event Types (`verifiers/types.py`)
- **StartEvent** - Emitted once at start with resolved `num_examples` and `rollouts_per_example`
- **ProgressEvent** - Emitted after each rollout/group completes with `all_outputs` and `new_outputs`
- **GroupCompleteEvent** - For grouped scoring, includes full `State` objects (addresses PR #632)
- **LogEvent** - For log messages with level, source, and timestamp
- **LogStreamEvent** - Infrastructure for streaming logs to files (#753)
- **SaveEvent** - When results are saved (intermediate or final)
- **CompleteEvent** - When generation finishes with timing and metrics

### Event Emission (`verifiers/envs/environment.py`)
- Updated `generate()` and `evaluate()` signatures to use `on_event` parameter
- Added `_emit_event()` helper using `maybe_await()` for sync/async handlers
- Added `_run_group_with_states()` internal method to preserve State objects for GroupCompleteEvent
- Events emitted at all key points: start, progress, group complete, save, log, complete

### Event Consumption (`verifiers/utils/eval_utils.py`)
- Migrated `run_evaluations_tui()` to use `match/case` pattern for event handling
- All metric accumulation logic preserved
- Progress bar, display updates, and logging all driven by events

### Supporting Infrastructure
- **New file:** `verifiers/utils/event_utils.py` - LogStreamFileWriter for log tailing
- **Updated:** `verifiers/utils/eval_display.py` - Comments updated for new event system

## Testing

Comprehensive test coverage demonstrates the system works correctly:

### Unit Tests (`tests/test_event_system.py` - 10 tests)
- ✅ Event type structure validation
- ✅ LogStreamFileWriter functionality (file creation, appending, custom paths)
- ✅ All tests pass

### E2E Scenarios (`tests/test_event_system_e2e.py` - 4 scenarios)
Standalone executable script with realistic integration scenarios:
- ✅ Scenario 1: Simple independent scoring
- ✅ Scenario 2: Grouped scoring with multiple rollouts (tests GroupCompleteEvent)
- ✅ Scenario 3: Intermediate saves (tests SaveEvent emission)
- ✅ Scenario 4: Progress tracking with metrics
- ✅ All scenarios validate event order, data completeness, and counts
- Run with: `uv run python tests/test_event_system_e2e.py`

### Integration Testing
- [x] All existing tests pass when running `uv run pytest` locally (514 tests pass, 4 skipped external env tests)
- [x] New tests have been added to cover the changes (10 unit + 4 e2e + 6 bugfix + 2 immutability = 22 event system tests)
- [x] Verified with real `vf-eval` command - progress bar and TUI work correctly

### Manual Testing
Ran actual evaluation with `vf-eval` using a test environment:
```bash
$ uv run vf-eval test_config.toml
Processing 2 groups (2 total rollouts): 100%|██████████| 2/2 [00:00<00:00]
Evaluation completed in 1.94 seconds
```
✅ Progress bar updates correctly (requires ProgressEvent)
✅ Results display properly (requires CompleteEvent)

## Checklist

- [x] My code follows the style guidelines of this project as outlined in [AGENTS.md](https://github.com/PrimeIntellect-ai/verifiers/blob/main/AGENTS.md)
- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation (MEMORY.md added)
- [x] My changes generate no new warnings (only pre-existing experimental uv warnings)
- [x] Any dependent changes have been merged and published (N/A)

## Bug Fixes (Post-Review)

Six issues were identified during code review and have been fixed:

### 1. Server Mode Bypass (HIGH Priority)
**Problem:** When `independent_scoring=False`, grouped scoring bypassed the server mode dispatch in `run_group()`, causing failures in server mode.

**Fix:** Added server mode detection. In server mode, properly routes through `run_group()`. In local mode, uses `_run_group_with_states()` to get State objects for GroupCompleteEvent.

### 2. Missing Documentation (LOW Priority)
**Problem:** Changes to core user-facing methods weren't documented.

**Fix:** Added comprehensive event type documentation to `docs/reference.md` and usage examples to `docs/evaluation.md`.

**Test Coverage:** Added `tests/test_bugfix_event_system.py` with 6 tests covering server mode handling, num_examples calculation, and intermediate save events.

### 3. Intermediate SaveEvent Never Emitted (MEDIUM Priority)
**Problem:** The `generate()` method performed incremental saves after each completed task but never emitted SaveEvent with `is_intermediate=True`. The only SaveEvent emitted was for the final save with `is_intermediate=False`. This made the TUI handler's `case "save"` block dead code, since it only acts when `event["is_intermediate"]` is True.

**Fix:** Emit SaveEvent with `is_intermediate=True` after each incremental save in the main task loop. Now the TUI correctly displays checkpoint messages.

### 4. Mutable Reference in Events (MEDIUM Priority)
**Problem:** ProgressEvent and GroupCompleteEvent stored direct references to mutable lists (`builder.outputs`, `states`, `new_outputs`). When events were stored (e.g., in EventCollector), the lists would silently grow as more results were added, making `all_outputs` misleading.

**Fix:** Copy all list references when creating events: `list(builder.outputs)`, `list(states)`, `list(new_outputs)`.

**Test Coverage:** Added `tests/test_event_immutability.py` with 2 tests verifying events don't mutate after emission.

### 5. Unnecessary O(N²) Event Construction (MEDIUM Priority)
**Problem:** Changed `elif on_progress is not None:` to bare `else:`, which unconditionally creates ProgressEvent objects (including expensive list copies) even when `on_event=None`. This causes O(N²) allocations affecting production code like GEPA.

**Fix:** Use `elif on_event is not None:` to skip event construction when no handler is registered, matching the original callback pattern's performance characteristics.

### 6. Unused Parameter (LOW Priority)
**Problem:** `configured_rollouts_per_example` parameter was added to `generate()` but never used. The existing `len(set([example_id]))` logic already correctly calculates num_examples.

**Fix:** Removed the unused parameter. The existing implementation is correct.

## Additional Notes

### Design Decisions

**Why TypedDict over dataclasses?**
- Matches existing patterns in the codebase
- JSON-serializable by default
- Works well with Literal discriminators for type-safe pattern matching

**Why break backward compatibility?**
- The previous callback system was acknowledged as "tailor-made/brittle" (issue #755)
- Clean break is simpler than maintaining an adapter layer
- All internal usages migrated in this PR
- Better to do it now than carry technical debt

**State Preservation Strategy**
- Created internal `_run_group_with_states()` method to return both State objects and outputs
- Public `run_group()` API remains unchanged (returns only outputs)
- GroupCompleteEvent receives State objects without breaking existing callers

### Future Work

- Full subprocess log streaming implementation (infrastructure in place)
- Additional event types as needed (e.g., ErrorEvent for failures)
- TUI features that leverage State objects from GroupCompleteEvent

### Context

I'm relatively new to this codebase and the broader Prime Intellect ecosystem, so I focused on:
1. Understanding the existing patterns (TypedDict, maybe_await, etc.)
2. Following established conventions
3. Thorough testing to ensure no regressions
4. Clear documentation for future maintainers

Feedback welcome, especially on areas where I might have missed broader integration concerns!

## Related Issues

- Closes #755
- Addresses PR #632 (GroupCompleteEvent infrastructure)
- Partial implementation of #753 (log streaming infrastructure)
Comment thread
cursor[bot] marked this conversation as resolved.
32 changes: 32 additions & 0 deletions docs/evaluation.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This section explains how to run evaluations with Verifiers environments. See [E
- [Output and Saving](#output-and-saving)
- [Resuming Evaluations](#resuming-evaluations)
- [Environment Defaults](#environment-defaults)
- [Programmatic Usage with Event Handlers](#programmatic-usage-with-event-handlers)
- [Multi-Environment Evaluation](#multi-environment-evaluation)
- [TOML Configuration](#toml-configuration)
- [Configuration Precedence](#configuration-precedence)
Expand Down Expand Up @@ -220,6 +221,37 @@ These defaults are used when higher-priority sources don't specify a value. The

See [Configuration Precedence](#configuration-precedence) for more details on multi-environment evaluation.

## Programmatic Usage with Event Handlers

When using environments programmatically (not via the CLI), you can monitor generation progress by passing an `on_event` callback to `Environment.generate()` or `Environment.evaluate()`:

```python
import verifiers as vf

def handle_event(event):
if event["type"] == "start":
print(f"Starting {event['total_rollouts']} rollouts...")
elif event["type"] == "progress":
print(f"Progress: {event['completed_count']}/{event['total_count']}")
elif event["type"] == "group_complete":
# Access states for custom analysis
avg_reward = sum(s["reward"] for s in event["states"]) / len(event["states"])
print(f"Example {event['example_id']}: avg reward = {avg_reward:.3f}")
elif event["type"] == "complete":
print(f"Complete! Average reward: {event['avg_reward']:.3f}")

env = vf.load_environment("my-env")
outputs = await env.evaluate(
client=client,
model="gpt-4.1-mini",
num_examples=10,
rollouts_per_example=4,
on_event=handle_event,
)
```

The `on_event` parameter accepts both synchronous and asynchronous handlers. See the [Event Types](reference.md#event-types) reference for all available event types and their fields.

## Multi-Environment Evaluation

You can evaluate multiple environments using `prime eval` with a TOML configuration file. This is useful for running comprehensive benchmark suites.
Expand Down
138 changes: 138 additions & 0 deletions docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- [Type Aliases](#type-aliases)
- [Data Types](#data-types)
- [Event Types](#event-types)
- [Classes](#classes)
- [Environment Classes](#environment-classes)
- [Parser Classes](#parser-classes)
Expand Down Expand Up @@ -225,6 +226,143 @@ class RolloutScores(TypedDict):
metrics: dict[str, list[float]]
```

### Event Types

Event types for monitoring generation progress via the `on_event` parameter in `Environment.generate()` and `Environment.evaluate()`.

#### EventHandler

```python
EventHandler = Callable[[EvalEvent], None] | Callable[[EvalEvent], Awaitable[None]]
```

Event handler callback type. Accepts both synchronous and asynchronous handlers.

#### EvalEvent

```python
EvalEvent = Union[
StartEvent,
ProgressEvent,
GroupCompleteEvent,
LogEvent,
LogStreamEvent,
SaveEvent,
CompleteEvent,
]
```

Union of all event types that can be emitted during generation/evaluation.

#### StartEvent

```python
class StartEvent(TypedDict):
type: Literal["start"]
total_rollouts: int
num_examples: int
rollouts_per_example: int
```

Emitted once at the start of generation with resolved counts.

#### ProgressEvent

```python
class ProgressEvent(TypedDict):
type: Literal["progress"]
all_outputs: list[RolloutOutput]
new_outputs: list[RolloutOutput]
completed_count: int
total_count: int
```

Emitted after each rollout or group completes.

#### GroupCompleteEvent

```python
class GroupCompleteEvent(TypedDict):
type: Literal["group_complete"]
example_id: int
states: list[State]
outputs: list[RolloutOutput]
```

Emitted when a group of rollouts for one example completes (only in non-independent scoring mode).

#### LogEvent

```python
class LogEvent(TypedDict):
type: Literal["log"]
message: str
level: Literal["debug", "info", "warning", "error"]
source: str
timestamp: float
```

Emitted for log messages from various sources.

#### LogStreamEvent

```python
class LogStreamEvent(TypedDict):
type: Literal["log_stream"]
stream_id: str
source: str
data: str
is_stderr: bool
file_path: Path | None
```

Emitted for streaming log data.

#### SaveEvent

```python
class SaveEvent(TypedDict):
type: Literal["save"]
path: Path
is_intermediate: bool
output_count: int
```

Emitted when results are saved to disk.

#### CompleteEvent

```python
class CompleteEvent(TypedDict):
type: Literal["complete"]
total_outputs: int
avg_reward: float
total_time_ms: float
```

Emitted when generation finishes.

**Example usage:**

```python
def handle_event(event: EvalEvent):
if event["type"] == "start":
print(f"Starting {event['total_rollouts']} rollouts...")
elif event["type"] == "progress":
print(f"Completed {event['completed_count']}/{event['total_count']}")
elif event["type"] == "group_complete":
print(f"Group {event['example_id']} complete with {len(event['states'])} rollouts")
elif event["type"] == "complete":
print(f"Done! Average reward: {event['avg_reward']:.3f}")

outputs = await env.generate(
inputs,
client=client,
model=model,
on_event=handle_event,
)
```

---

## Classes
Expand Down
Loading
Loading