diff --git a/configs/default_config.yaml b/configs/default_config.yaml index d33851ec0..f46106d1d 100644 --- a/configs/default_config.yaml +++ b/configs/default_config.yaml @@ -159,3 +159,14 @@ evaluator: # LLM-based feedback (experimental) use_llm_feedback: false # Use LLM to evaluate code quality llm_feedback_weight: 0.1 # Weight for LLM feedback in final score + +# Evolution trace configuration +# Logs detailed traces of program evolution for RL training and analysis +evolution_trace: + enabled: false # Enable evolution trace logging + format: 'jsonl' # Output format: 'jsonl', 'json', or 'hdf5' + include_code: false # Include full program code in traces + include_prompts: true # Include prompts and LLM responses + output_path: null # Path for trace output (defaults to output_dir/evolution_trace.{format}) + buffer_size: 10 # Number of traces to buffer before writing + compress: false # Compress output file (jsonl only) diff --git a/openevolve/config.py b/openevolve/config.py index efe38cea3..4a31f8d0d 100644 --- a/openevolve/config.py +++ b/openevolve/config.py @@ -313,6 +313,19 @@ class EvaluatorConfig: max_artifact_storage: int = 100 * 1024 * 1024 # 100MB per program +@dataclass +class EvolutionTraceConfig: + """Configuration for evolution trace logging""" + + enabled: bool = False + format: str = "jsonl" # Options: "jsonl", "json", "hdf5" + include_code: bool = False + include_prompts: bool = True + output_path: Optional[str] = None + buffer_size: int = 10 + compress: bool = False + + @dataclass class Config: """Master configuration for OpenEvolve""" @@ -330,6 +343,7 @@ class Config: prompt: PromptConfig = field(default_factory=PromptConfig) database: DatabaseConfig = field(default_factory=DatabaseConfig) evaluator: EvaluatorConfig = field(default_factory=EvaluatorConfig) + evolution_trace: EvolutionTraceConfig = field(default_factory=EvolutionTraceConfig) # Evolution settings diff_based_evolution: bool = True @@ -355,7 +369,7 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "Config": # Update top-level fields for key, value in config_dict.items(): - if key not in ["llm", "prompt", "database", "evaluator"] and hasattr(config, key): + if key not in ["llm", "prompt", "database", "evaluator", "evolution_trace"] and hasattr(config, key): setattr(config, key, value) # Update nested configs @@ -378,6 +392,8 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "Config": config.database.random_seed = config.random_seed if "evaluator" in config_dict: config.evaluator = EvaluatorConfig(**config_dict["evaluator"]) + if "evolution_trace" in config_dict: + config.evolution_trace = EvolutionTraceConfig(**config_dict["evolution_trace"]) return config @@ -446,6 +462,15 @@ def to_dict(self) -> Dict[str, Any]: "use_llm_feedback": self.evaluator.use_llm_feedback, "llm_feedback_weight": self.evaluator.llm_feedback_weight, }, + "evolution_trace": { + "enabled": self.evolution_trace.enabled, + "format": self.evolution_trace.format, + "include_code": self.evolution_trace.include_code, + "include_prompts": self.evolution_trace.include_prompts, + "output_path": self.evolution_trace.output_path, + "buffer_size": self.evolution_trace.buffer_size, + "compress": self.evolution_trace.compress, + }, # Evolution settings "diff_based_evolution": self.diff_based_evolution, "max_code_length": self.max_code_length, diff --git a/openevolve/controller.py b/openevolve/controller.py index d4c9ed214..2e1d67c0c 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -14,6 +14,7 @@ from openevolve.config import Config, load_config from openevolve.database import Program, ProgramDatabase from openevolve.evaluator import Evaluator +from openevolve.evolution_trace import EvolutionTracer from openevolve.llm.ensemble import LLMEnsemble from openevolve.prompt.sampler import PromptSampler from openevolve.process_parallel import ProcessParallelController @@ -163,6 +164,29 @@ def __init__( logger.info(f"Initialized OpenEvolve with {initial_program_path}") + # Initialize evolution tracer + if self.config.evolution_trace.enabled: + trace_output_path = self.config.evolution_trace.output_path + if not trace_output_path: + # Default to output_dir/evolution_trace.{format} + trace_output_path = os.path.join( + self.output_dir, + f"evolution_trace.{self.config.evolution_trace.format}" + ) + + self.evolution_tracer = EvolutionTracer( + output_path=trace_output_path, + format=self.config.evolution_trace.format, + include_code=self.config.evolution_trace.include_code, + include_prompts=self.config.evolution_trace.include_prompts, + enabled=True, + buffer_size=self.config.evolution_trace.buffer_size, + compress=self.config.evolution_trace.compress + ) + logger.info(f"Evolution tracing enabled: {trace_output_path}") + else: + self.evolution_tracer = None + # Initialize improved parallel processing components self.parallel_controller = None @@ -276,7 +300,7 @@ async def run( # Initialize improved parallel processing try: self.parallel_controller = ProcessParallelController( - self.config, self.evaluation_file, self.database + self.config, self.evaluation_file, self.database, self.evolution_tracer ) # Set up signal handlers for graceful shutdown @@ -319,6 +343,11 @@ def force_exit_handler(signum, frame): if self.parallel_controller: self.parallel_controller.stop() self.parallel_controller = None + + # Close evolution tracer + if self.evolution_tracer: + self.evolution_tracer.close() + logger.info("Evolution tracer closed") # Get the best program best_program = None diff --git a/openevolve/evolution_trace.py b/openevolve/evolution_trace.py new file mode 100644 index 000000000..7e5a74a2d --- /dev/null +++ b/openevolve/evolution_trace.py @@ -0,0 +1,564 @@ +""" +Evolution trace logging for OpenEvolve + +This module provides functionality to log detailed traces of program evolution, +capturing state-action-reward transitions for RL training and analysis. +""" + +import json +import logging +import time +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +from openevolve.utils.trace_export_utils import ( + append_trace_jsonl, + export_traces, + export_traces_json, +) + +logger = logging.getLogger(__name__) + + +@dataclass +class EvolutionTrace: + """Represents a single evolution trace entry""" + + iteration: int + timestamp: float + parent_id: str + child_id: str + parent_metrics: Dict[str, Any] + child_metrics: Dict[str, Any] + parent_code: Optional[str] = None + child_code: Optional[str] = None + code_diff: Optional[str] = None + prompt: Optional[Dict[str, str]] = None + llm_response: Optional[str] = None + improvement_delta: Optional[Dict[str, float]] = None + island_id: Optional[int] = None + generation: Optional[int] = None + artifacts: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert trace to dictionary format""" + return {k: v for k, v in asdict(self).items() if v is not None} + + def calculate_improvement(self) -> Dict[str, float]: + """Calculate improvement deltas between parent and child metrics""" + improvement = {} + for key in self.child_metrics: + if key in self.parent_metrics: + parent_val = self.parent_metrics[key] + child_val = self.child_metrics[key] + if isinstance(parent_val, (int, float)) and isinstance(child_val, (int, float)): + improvement[key] = child_val - parent_val + return improvement + + +class EvolutionTracer: + """Manages evolution trace logging with support for multiple formats""" + + def __init__( + self, + output_path: Optional[str] = None, + format: str = "jsonl", + include_code: bool = False, + include_prompts: bool = True, + enabled: bool = True, + buffer_size: int = 10, + compress: bool = False, + ): + """ + Initialize the evolution tracer + + Args: + output_path: Path to save trace data + format: Output format ('jsonl', 'json', 'hdf5') + include_code: Whether to include full code in traces + include_prompts: Whether to include prompts and LLM responses + enabled: Whether tracing is enabled + buffer_size: Number of traces to buffer before writing + compress: Whether to compress output files + """ + self.enabled = enabled + self.format = format + self.include_code = include_code + self.include_prompts = include_prompts + self.compress = compress + self.buffer_size = buffer_size + + # Track statistics + self.stats = { + "total_traces": 0, + "improvement_count": 0, + "total_improvement": {}, + "best_improvement": {}, + "worst_decline": {}, + } + + if not self.enabled: + logger.info("Evolution tracing is disabled") + return + + # Set up output path + if output_path: + self.output_path = Path(output_path) + else: + self.output_path = Path(f"evolution_trace.{format}") + + # Add compression extension if needed + if self.compress and format == "jsonl": + self.output_path = self.output_path.with_suffix(".jsonl.gz") + + # Create parent directory if needed + self.output_path.parent.mkdir(parents=True, exist_ok=True) + + # Initialize buffer for batched writing + self.buffer: List[EvolutionTrace] = [] + + # For JSON format, keep all traces in memory + if format == "json": + self.json_traces = [] + + logger.info(f"Evolution tracer initialized: {self.output_path}") + + def log_trace( + self, + iteration: int, + parent_program: Any, + child_program: Any, + prompt: Optional[Dict[str, str]] = None, + llm_response: Optional[str] = None, + artifacts: Optional[Dict[str, Any]] = None, + island_id: Optional[int] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Log an evolution trace entry + + Args: + iteration: Current iteration number + parent_program: Parent program object + child_program: Child program object + prompt: Prompt used for evolution + llm_response: LLM response + artifacts: Any artifacts from evaluation + island_id: Island ID if using island-based evolution + metadata: Additional metadata + """ + if not self.enabled: + return + + try: + # Create trace entry + trace = EvolutionTrace( + iteration=iteration, + timestamp=time.time(), + parent_id=parent_program.id, + child_id=child_program.id, + parent_metrics=parent_program.metrics, + child_metrics=child_program.metrics, + island_id=island_id, + generation=child_program.generation, + artifacts=artifacts, + metadata=metadata, + ) + + # Optionally include code + if self.include_code: + trace.parent_code = parent_program.code + trace.child_code = child_program.code + + # Optionally include prompts + if self.include_prompts: + trace.prompt = prompt + trace.llm_response = llm_response + + # Calculate improvement + trace.improvement_delta = trace.calculate_improvement() + + # Update statistics + self._update_stats(trace) + + # Add to buffer + self.buffer.append(trace) + + # For JSON format, also keep in memory + if self.format == "json": + self.json_traces.append(trace) + + # Write if buffer is full + if len(self.buffer) >= self.buffer_size: + self.flush() + + except Exception as e: + logger.error(f"Error logging evolution trace: {e}") + + def _update_stats(self, trace: EvolutionTrace): + """Update running statistics""" + self.stats["total_traces"] += 1 + + if trace.improvement_delta: + # Check if there's improvement in combined_score + if "combined_score" in trace.improvement_delta: + delta = trace.improvement_delta["combined_score"] + if delta > 0: + self.stats["improvement_count"] += 1 + + # Track cumulative improvements + for metric, delta in trace.improvement_delta.items(): + if metric not in self.stats["total_improvement"]: + self.stats["total_improvement"][metric] = 0 + self.stats["best_improvement"][metric] = delta + self.stats["worst_decline"][metric] = delta + + self.stats["total_improvement"][metric] += delta + + if delta > self.stats["best_improvement"][metric]: + self.stats["best_improvement"][metric] = delta + if delta < self.stats["worst_decline"][metric]: + self.stats["worst_decline"][metric] = delta + + def flush(self): + """Write buffered traces to file""" + if not self.enabled or not self.buffer: + return + + try: + if self.format == "jsonl": + # Append each trace to the JSONL file + for trace in self.buffer: + append_trace_jsonl(trace, self.output_path, compress=self.compress) + elif self.format == "json": + # For JSON, we'll write everything at close + pass # Traces already added to json_traces + elif self.format == "hdf5": + # For HDF5, we need to write everything at once + # So we'll keep accumulating until close + pass + + # Clear buffer after writing (except for formats that need full data) + if self.format == "jsonl": + self.buffer.clear() + + except Exception as e: + logger.error(f"Error flushing traces to file: {e}") + + def get_statistics(self) -> Dict[str, Any]: + """Get current tracing statistics""" + return { + **self.stats, + "improvement_rate": ( + self.stats["improvement_count"] / self.stats["total_traces"] + if self.stats["total_traces"] > 0 + else 0 + ), + } + + def close(self): + """Close the tracer and flush remaining data""" + if not self.enabled: + return + + # Flush any remaining traces + self.flush() + + # For JSON and HDF5, write everything at close + if self.format == "json" and hasattr(self, "json_traces"): + metadata = { + "created_at": time.time(), + "include_code": self.include_code, + "include_prompts": self.include_prompts, + "total_traces": len(self.json_traces), + } + export_traces_json(self.json_traces, self.output_path, metadata=metadata) + elif self.format == "hdf5": + # Export all buffered traces + all_traces = getattr(self, "json_traces", self.buffer) + if all_traces: + metadata = { + "created_at": time.time(), + "include_code": self.include_code, + "include_prompts": self.include_prompts, + } + export_traces(all_traces, self.output_path, format="hdf5", metadata=metadata) + + # Log final statistics + stats = self.get_statistics() + logger.info(f"Evolution tracing complete. Total traces: {stats['total_traces']}") + logger.info(f"Improvement rate: {stats['improvement_rate']:.2%}") + + if stats["best_improvement"]: + logger.info(f"Best improvements: {stats['best_improvement']}") + if stats["worst_decline"]: + logger.info(f"Worst declines: {stats['worst_decline']}") + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.close() + + +def extract_evolution_trace_from_checkpoint( + checkpoint_dir: Union[str, Path], + output_path: Optional[str] = None, + format: str = "jsonl", + include_code: bool = True, + include_prompts: bool = True, +) -> List[EvolutionTrace]: + """ + Extract evolution traces from an existing checkpoint directory + + Args: + checkpoint_dir: Path to checkpoint directory + output_path: Optional path to save extracted traces + format: Output format ('jsonl', 'json') + include_code: Whether to include code in traces + include_prompts: Whether to include prompts in traces + + Returns: + List of EvolutionTrace objects + """ + checkpoint_path = Path(checkpoint_dir) + if not checkpoint_path.exists(): + raise FileNotFoundError(f"Checkpoint directory {checkpoint_dir} not found") + + programs_dir = checkpoint_path / "programs" + if not programs_dir.exists(): + raise FileNotFoundError(f"Programs directory not found in {checkpoint_dir}") + + logger.info(f"Extracting evolution traces from {checkpoint_dir}") + + # Load all programs + programs = {} + program_files = list(programs_dir.glob("*.json")) + + for prog_file in program_files: + try: + with open(prog_file, "r") as f: + prog_data = json.load(f) + programs[prog_data["id"]] = prog_data + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"Error loading program from {prog_file}: {e}") + continue + + logger.info(f"Loaded {len(programs)} programs from checkpoint") + + # Build parent-child traces + traces = [] + for prog_id, prog in programs.items(): + # Skip programs without parents + parent_id = prog.get("parent_id") + if not parent_id or parent_id not in programs: + continue + + parent = programs[parent_id] + + # Create trace entry + trace = EvolutionTrace( + iteration=prog.get("iteration_found", 0), + timestamp=prog.get("timestamp", 0), + parent_id=parent_id, + child_id=prog_id, + parent_metrics=parent.get("metrics", {}), + child_metrics=prog.get("metrics", {}), + generation=prog.get("generation", 0), + island_id=prog.get("metadata", {}).get("island"), + metadata=prog.get("metadata", {}), + ) + + # Add code if requested + if include_code: + trace.parent_code = parent.get("code", "") + trace.child_code = prog.get("code", "") + + # Add prompts if available and requested + if include_prompts: + # Check for prompt data in the program + if "prompts" in prog: + # Prompts might be stored in the program data + trace.prompt = prog["prompts"].get("prompt") + trace.llm_response = prog["prompts"].get("response") + + # Calculate improvement + trace.improvement_delta = trace.calculate_improvement() + + traces.append(trace) + + # Sort traces by iteration + traces.sort(key=lambda x: (x.iteration, x.timestamp)) + + logger.info(f"Extracted {len(traces)} evolution traces") + + # Save to file if output path provided + if output_path: + metadata = { + "total_traces": len(traces), + "extracted_at": time.time(), + "source": "checkpoint", + } + export_traces(traces, output_path, format=format, metadata=metadata) + logger.info(f"Saved {len(traces)} traces to {output_path}") + + return traces + + +def extract_full_lineage_traces( + checkpoint_dir: Union[str, Path], output_path: Optional[str] = None, format: str = "json" +) -> List[Dict[str, Any]]: + """ + Extract complete evolution traces with full lineage chains and prompts/actions + + This function builds the complete evolution history for each program, + tracing back through all ancestors to create full lineage chains. + + Args: + checkpoint_dir: Path to checkpoint directory + output_path: Optional path to save extracted traces + format: Output format ('json' or 'jsonl') + + Returns: + List of lineage trace dictionaries + """ + checkpoint_path = Path(checkpoint_dir) + if not checkpoint_path.exists(): + raise FileNotFoundError(f"Checkpoint directory {checkpoint_dir} not found") + + programs_dir = checkpoint_path / "programs" + if not programs_dir.exists(): + raise FileNotFoundError(f"Programs directory not found in {checkpoint_dir}") + + logger.info(f"Extracting full lineage traces from {checkpoint_dir}") + + # Load all programs + programs = {} + program_files = list(programs_dir.glob("*.json")) + + for prog_file in program_files: + try: + with open(prog_file, "r") as f: + prog_data = json.load(f) + programs[prog_data["id"]] = prog_data + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"Error loading program from {prog_file}: {e}") + continue + + logger.info(f"Loaded {len(programs)} programs from checkpoint") + + # Build lineage traces for each program + traces = [] + + for program_id, program in programs.items(): + # Build lineage chain by tracing back through parents + lineage = [] + current = program + + while current: + lineage.append(current) + parent_id = current.get("parent_id") + current = programs.get(parent_id) if parent_id else None + + # Reverse to get chronological order (oldest to newest) + lineage.reverse() + + # Extract improvement steps with actions + improvements = [] + for i in range(len(lineage) - 1): + parent = lineage[i] + child = lineage[i + 1] + + # Extract the action (prompt and response) from child's data + prompts = child.get("prompts", {}) + action = None + + # Get the prompt used (could be diff_user, full_rewrite_user, or other templates) + for template_key, prompt_data in prompts.items(): + action = { + "template": template_key, + "system_prompt": prompt_data.get("system", ""), + "user_prompt": prompt_data.get("user", ""), + "llm_response": ( + prompt_data.get("responses", [""])[0] + if prompt_data.get("responses") + else "" + ), + } + break # Take the first prompt found + + # Calculate improvements for each metric + improvement_deltas = {} + if child.get("metrics") and parent.get("metrics"): + for metric in child["metrics"].keys(): + if metric in parent["metrics"]: + parent_val = parent["metrics"][metric] + child_val = child["metrics"][metric] + if isinstance(parent_val, (int, float)) and isinstance( + child_val, (int, float) + ): + improvement_deltas[metric] = child_val - parent_val + + improvement = { + "step": i, + "parent_id": parent["id"], + "child_id": child["id"], + "parent_metrics": parent.get("metrics", {}), + "child_metrics": child.get("metrics", {}), + "improvement": improvement_deltas, + "generation": child.get("generation", 0), + "iteration_found": child.get("iteration_found", 0), + "changes_description": child.get("metadata", {}).get("changes", ""), + "island_id": child.get("metadata", {}).get("island"), + "action": action, # The prompt/response that led to this improvement + } + improvements.append(improvement) + + # Only add traces that have improvement steps + if improvements: + trace = { + "final_program_id": program_id, + "final_metrics": program.get("metrics", {}), + "generation_depth": len(lineage), + "total_iterations": program.get("iteration_found", 0), + "improvement_steps": improvements, + "metadata": { + "language": program.get("language", ""), + "timestamp": program.get("timestamp", 0), + }, + } + traces.append(trace) + + # Sort traces by generation depth (most evolved first) + traces.sort(key=lambda x: x["generation_depth"], reverse=True) + + logger.info(f"Extracted {len(traces)} lineage traces") + + # Save to file if output path provided + if output_path: + if format == "json": + metadata = { + "total_traces": len(traces), + "extracted_at": time.time(), + "source": "checkpoint", + "type": "full_lineage", + } + export_traces_json(traces, output_path, metadata=metadata) + elif format == "jsonl": + # For JSONL, write each trace as a separate line + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w") as f: + for trace in traces: + json.dump(trace, f) + f.write("\n") + else: + raise ValueError(f"Unsupported format: {format}. Use 'json' or 'jsonl'") + + logger.info(f"Saved {len(traces)} lineage traces to {output_path}") + + return traces diff --git a/openevolve/process_parallel.py b/openevolve/process_parallel.py index 79788fbe8..04aa0db39 100644 --- a/openevolve/process_parallel.py +++ b/openevolve/process_parallel.py @@ -274,10 +274,11 @@ def _run_iteration_worker( class ProcessParallelController: """Controller for process-based parallel evolution""" - def __init__(self, config: Config, evaluation_file: str, database: ProgramDatabase): + def __init__(self, config: Config, evaluation_file: str, database: ProgramDatabase, evolution_tracer=None): self.config = config self.evaluation_file = evaluation_file self.database = database + self.evolution_tracer = evolution_tracer self.executor: Optional[ProcessPoolExecutor] = None self.shutdown_event = mp.Event() @@ -481,6 +482,28 @@ async def run_evolution( # Store artifacts if result.artifacts: self.database.store_artifacts(child_program.id, result.artifacts) + + # Log evolution trace + if self.evolution_tracer: + # Retrieve parent program for trace logging + parent_program = self.database.get(result.parent_id) if result.parent_id else None + if parent_program: + # Determine island ID + island_id = child_program.metadata.get("island", self.database.current_island) + + self.evolution_tracer.log_trace( + iteration=completed_iteration, + parent_program=parent_program, + child_program=child_program, + prompt=result.prompt, + llm_response=result.llm_response, + artifacts=result.artifacts, + island_id=island_id, + metadata={ + "iteration_time": result.iteration_time, + "changes": child_program.metadata.get("changes", ""), + } + ) # Log prompts if result.prompt: diff --git a/openevolve/utils/trace_export_utils.py b/openevolve/utils/trace_export_utils.py new file mode 100644 index 000000000..df033ae8b --- /dev/null +++ b/openevolve/utils/trace_export_utils.py @@ -0,0 +1,369 @@ +""" +Utilities for exporting evolution traces to various formats +""" + +import json +import logging +import time +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +logger = logging.getLogger(__name__) + + +def export_traces_jsonl(traces: List[Any], output_path: Union[str, Path], compress: bool = False) -> None: + """ + Export traces to JSONL format (one JSON object per line) + + Args: + traces: List of trace objects with to_dict() method + output_path: Path to save the JSONL file + compress: Whether to compress the output with gzip + """ + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + if compress: + import gzip + if not output_path.suffix == '.gz': + output_path = output_path.with_suffix(output_path.suffix + '.gz') + open_func = gzip.open + mode = 'wt' + else: + open_func = open + mode = 'w' + + with open_func(output_path, mode) as f: + for trace in traces: + trace_dict = trace.to_dict() if hasattr(trace, 'to_dict') else trace + json.dump(trace_dict, f) + f.write('\n') + + logger.info(f"Exported {len(traces)} traces to {output_path}") + + +def export_traces_json(traces: List[Any], output_path: Union[str, Path], metadata: Optional[Dict[str, Any]] = None) -> None: + """ + Export traces to JSON format with metadata + + Args: + traces: List of trace objects with to_dict() method + output_path: Path to save the JSON file + metadata: Optional metadata to include in the output + """ + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Convert traces to dictionaries + trace_dicts = [] + for trace in traces: + if hasattr(trace, 'to_dict'): + trace_dicts.append(trace.to_dict()) + else: + trace_dicts.append(trace) + + # Build output structure + output_data = { + "metadata": metadata or {}, + "traces": trace_dicts + } + + # Add default metadata + output_data["metadata"].setdefault("total_traces", len(trace_dicts)) + output_data["metadata"].setdefault("exported_at", time.time()) + + with open(output_path, 'w') as f: + json.dump(output_data, f, indent=2) + + logger.info(f"Exported {len(traces)} traces to {output_path}") + + +def export_traces_hdf5(traces: List[Any], output_path: Union[str, Path], metadata: Optional[Dict[str, Any]] = None) -> None: + """ + Export traces to HDF5 format + + Args: + traces: List of trace objects with to_dict() method + output_path: Path to save the HDF5 file + metadata: Optional metadata to include in the output + """ + try: + import h5py + import numpy as np + except ImportError: + logger.error("h5py is required for HDF5 export. Install with: pip install h5py") + raise ImportError("h5py not installed") + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + with h5py.File(output_path, 'w') as f: + # Create groups + traces_group = f.create_group('traces') + meta_group = f.create_group('metadata') + + # Add metadata + if metadata: + for key, value in metadata.items(): + if isinstance(value, (str, int, float, bool)): + meta_group.attrs[key] = value + else: + meta_group.attrs[key] = json.dumps(value) + + meta_group.attrs['total_traces'] = len(traces) + meta_group.attrs['exported_at'] = time.time() + + # Add traces + for i, trace in enumerate(traces): + trace_dict = trace.to_dict() if hasattr(trace, 'to_dict') else trace + trace_group = traces_group.create_group(f'trace_{i:06d}') + + for key, value in trace_dict.items(): + if value is None: + continue + + if isinstance(value, dict): + # Store dictionaries as JSON strings in attributes + trace_group.attrs[key] = json.dumps(value) + elif isinstance(value, list): + # Convert lists to numpy arrays if possible + try: + arr = np.array(value) + trace_group.create_dataset(key, data=arr) + except (ValueError, TypeError): + # Fall back to JSON for complex lists + trace_group.attrs[key] = json.dumps(value) + elif isinstance(value, str): + # Store strings as attributes + trace_group.attrs[key] = value + elif isinstance(value, (int, float, bool)): + # Store scalars as attributes + trace_group.attrs[key] = value + else: + # Store other types as JSON + trace_group.attrs[key] = json.dumps(value) + + logger.info(f"Exported {len(traces)} traces to {output_path}") + + +def append_trace_jsonl(trace: Any, output_path: Union[str, Path], compress: bool = False) -> None: + """ + Append a single trace to a JSONL file + + Args: + trace: Trace object with to_dict() method + output_path: Path to the JSONL file + compress: Whether the file is compressed with gzip + """ + output_path = Path(output_path) + + if compress: + import gzip + if not output_path.suffix == '.gz': + output_path = output_path.with_suffix(output_path.suffix + '.gz') + open_func = gzip.open + mode = 'at' + else: + open_func = open + mode = 'a' + + trace_dict = trace.to_dict() if hasattr(trace, 'to_dict') else trace + + with open_func(output_path, mode) as f: + json.dump(trace_dict, f) + f.write('\n') + + +def load_traces_jsonl(input_path: Union[str, Path], compress: bool = False) -> List[Dict[str, Any]]: + """ + Load traces from a JSONL file + + Args: + input_path: Path to the JSONL file + compress: Whether the file is compressed with gzip + + Returns: + List of trace dictionaries + """ + input_path = Path(input_path) + + if compress or input_path.suffix == '.gz': + import gzip + open_func = gzip.open + mode = 'rt' + else: + open_func = open + mode = 'r' + + traces = [] + with open_func(input_path, mode) as f: + for line in f: + if line.strip(): + traces.append(json.loads(line)) + + return traces + + +def load_traces_json(input_path: Union[str, Path]) -> tuple[List[Dict[str, Any]], Dict[str, Any]]: + """ + Load traces from a JSON file + + Args: + input_path: Path to the JSON file + + Returns: + Tuple of (traces list, metadata dict) + """ + with open(input_path, 'r') as f: + data = json.load(f) + + traces = data.get('traces', []) + metadata = data.get('metadata', {}) + + return traces, metadata + + +def load_traces_hdf5(input_path: Union[str, Path]) -> tuple[List[Dict[str, Any]], Dict[str, Any]]: + """ + Load traces from an HDF5 file + + Args: + input_path: Path to the HDF5 file + + Returns: + Tuple of (traces list, metadata dict) + """ + try: + import h5py + except ImportError: + logger.error("h5py is required for HDF5 loading. Install with: pip install h5py") + raise ImportError("h5py not installed") + + traces = [] + metadata = {} + + with h5py.File(input_path, 'r') as f: + # Load metadata + if 'metadata' in f: + meta_group = f['metadata'] + for key in meta_group.attrs: + value = meta_group.attrs[key] + # Try to parse JSON strings + if isinstance(value, str) and value.startswith('{'): + try: + metadata[key] = json.loads(value) + except json.JSONDecodeError: + metadata[key] = value + else: + metadata[key] = value + + # Load traces + if 'traces' in f: + traces_group = f['traces'] + for trace_name in sorted(traces_group.keys()): + trace_group = traces_group[trace_name] + trace_dict = {} + + # Load attributes + for key in trace_group.attrs: + value = trace_group.attrs[key] + # Try to parse JSON strings + if isinstance(value, str) and (value.startswith('{') or value.startswith('[')): + try: + trace_dict[key] = json.loads(value) + except json.JSONDecodeError: + trace_dict[key] = value + else: + trace_dict[key] = value + + # Load datasets + for key in trace_group.keys(): + dataset = trace_group[key] + trace_dict[key] = dataset[...].tolist() + + traces.append(trace_dict) + + return traces, metadata + + +def export_traces( + traces: List[Any], + output_path: Union[str, Path], + format: str = "jsonl", + compress: bool = False, + metadata: Optional[Dict[str, Any]] = None +) -> None: + """ + Export traces to specified format + + Args: + traces: List of trace objects + output_path: Path to save the file + format: Output format ('jsonl', 'json', 'hdf5') + compress: Whether to compress output (jsonl only) + metadata: Optional metadata (json and hdf5 only) + """ + format = format.lower() + + if format == "jsonl": + export_traces_jsonl(traces, output_path, compress=compress) + elif format == "json": + export_traces_json(traces, output_path, metadata=metadata) + elif format == "hdf5": + export_traces_hdf5(traces, output_path, metadata=metadata) + else: + raise ValueError(f"Unsupported format: {format}. Use 'jsonl', 'json', or 'hdf5'") + + +def load_traces( + input_path: Union[str, Path], + format: Optional[str] = None +) -> Union[List[Dict[str, Any]], tuple[List[Dict[str, Any]], Dict[str, Any]]]: + """ + Load traces from file, auto-detecting format if not specified + + Args: + input_path: Path to the file + format: Optional format ('jsonl', 'json', 'hdf5'). Auto-detected if None. + + Returns: + For JSONL: List of trace dictionaries + For JSON/HDF5: Tuple of (traces list, metadata dict) + """ + input_path = Path(input_path) + + # Auto-detect format from extension + if format is None: + if input_path.suffix in ['.jsonl', '.gz']: + format = 'jsonl' + elif input_path.suffix == '.json': + format = 'json' + elif input_path.suffix in ['.h5', '.hdf5']: + format = 'hdf5' + else: + # Try to detect from content + with open(input_path, 'rb') as f: + first_bytes = f.read(10) + if first_bytes.startswith(b'\x89HDF'): + format = 'hdf5' + elif first_bytes.startswith(b'{'): + # Could be JSON or JSONL, check for newlines + f.seek(0) + content = f.read(1000) + if b'\n{' in content or b'\n[' in content: + format = 'jsonl' + else: + format = 'json' + else: + format = 'jsonl' # Default assumption + + format = format.lower() + + if format == 'jsonl': + return load_traces_jsonl(input_path, compress=input_path.suffix == '.gz') + elif format == 'json': + return load_traces_json(input_path) + elif format == 'hdf5': + return load_traces_hdf5(input_path) + else: + raise ValueError(f"Unsupported format: {format}") \ No newline at end of file diff --git a/tests/test_evolution_trace.py b/tests/test_evolution_trace.py new file mode 100644 index 000000000..a4c9d97c9 --- /dev/null +++ b/tests/test_evolution_trace.py @@ -0,0 +1,654 @@ +""" +Tests for evolution trace functionality +""" + +import json +import os +import tempfile +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +from openevolve.evolution_trace import ( + EvolutionTrace, + EvolutionTracer, + extract_evolution_trace_from_checkpoint, + extract_full_lineage_traces +) +from openevolve.database import Program + + +class TestEvolutionTrace(unittest.TestCase): + """Test the EvolutionTrace dataclass""" + + def test_trace_creation(self): + """Test creating an evolution trace entry""" + trace = EvolutionTrace( + iteration=1, + timestamp=1234567890.0, + parent_id="parent-123", + child_id="child-456", + parent_metrics={"score": 0.5, "accuracy": 0.8}, + child_metrics={"score": 0.6, "accuracy": 0.85} + ) + + self.assertEqual(trace.iteration, 1) + self.assertEqual(trace.parent_id, "parent-123") + self.assertEqual(trace.child_id, "child-456") + + def test_calculate_improvement(self): + """Test improvement calculation""" + trace = EvolutionTrace( + iteration=1, + timestamp=1234567890.0, + parent_id="parent-123", + child_id="child-456", + parent_metrics={"score": 0.5, "accuracy": 0.8, "label": "good"}, + child_metrics={"score": 0.6, "accuracy": 0.75, "label": "better"} + ) + + improvement = trace.calculate_improvement() + + self.assertAlmostEqual(improvement["score"], 0.1) + self.assertAlmostEqual(improvement["accuracy"], -0.05) + self.assertNotIn("label", improvement) # Non-numeric values excluded + + def test_to_dict(self): + """Test conversion to dictionary""" + trace = EvolutionTrace( + iteration=1, + timestamp=1234567890.0, + parent_id="parent-123", + child_id="child-456", + parent_metrics={"score": 0.5}, + child_metrics={"score": 0.6}, + parent_code="def f(): pass", + island_id=2 + ) + + trace_dict = trace.to_dict() + + self.assertIn("iteration", trace_dict) + self.assertIn("parent_code", trace_dict) + self.assertIn("island_id", trace_dict) + self.assertNotIn("llm_response", trace_dict) # None values excluded + + +class TestEvolutionTracer(unittest.TestCase): + """Test the EvolutionTracer class""" + + def setUp(self): + """Set up test fixtures""" + self.temp_dir = tempfile.mkdtemp() + self.parent_program = Program( + id="parent-123", + code="def f(): return 1", + language="python", + metrics={"score": 0.5, "accuracy": 0.8}, + generation=1 + ) + self.child_program = Program( + id="child-456", + code="def f(): return 2", + language="python", + parent_id="parent-123", + metrics={"score": 0.6, "accuracy": 0.85}, + generation=2 + ) + + def tearDown(self): + """Clean up test fixtures""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_tracer_disabled(self): + """Test that disabled tracer does nothing""" + tracer = EvolutionTracer(enabled=False) + + # Should not create any files or log anything + tracer.log_trace( + iteration=1, + parent_program=self.parent_program, + child_program=self.child_program + ) + + stats = tracer.get_statistics() + self.assertEqual(stats["total_traces"], 0) + + def test_jsonl_format(self): + """Test JSONL format output""" + output_path = Path(self.temp_dir) / "trace.jsonl" + + tracer = EvolutionTracer( + output_path=str(output_path), + format="jsonl", + include_code=True, + buffer_size=1 # Write immediately + ) + + tracer.log_trace( + iteration=1, + parent_program=self.parent_program, + child_program=self.child_program, + prompt={"system": "Test prompt"}, + island_id=0 + ) + + # Check file was created + self.assertTrue(output_path.exists()) + + # Read and verify content + with open(output_path, "r") as f: + line = f.readline() + data = json.loads(line) + + self.assertEqual(data["iteration"], 1) + self.assertEqual(data["parent_id"], "parent-123") + self.assertEqual(data["child_id"], "child-456") + self.assertEqual(data["parent_code"], "def f(): return 1") + self.assertEqual(data["island_id"], 0) + + tracer.close() + + def test_json_format(self): + """Test JSON format output""" + output_path = Path(self.temp_dir) / "trace.json" + + tracer = EvolutionTracer( + output_path=str(output_path), + format="json", + include_prompts=False, + buffer_size=1 + ) + + tracer.log_trace( + iteration=1, + parent_program=self.parent_program, + child_program=self.child_program + ) + + tracer.log_trace( + iteration=2, + parent_program=self.child_program, + child_program=self.parent_program # Reverse for testing + ) + + tracer.close() + + # Read and verify content + with open(output_path, "r") as f: + data = json.load(f) + + self.assertIn("metadata", data) + self.assertIn("traces", data) + self.assertEqual(len(data["traces"]), 2) + self.assertEqual(data["traces"][0]["iteration"], 1) + self.assertEqual(data["traces"][1]["iteration"], 2) + + def test_statistics(self): + """Test statistics tracking""" + tracer = EvolutionTracer( + output_path=Path(self.temp_dir) / "trace.jsonl", + buffer_size=10 + ) + + # Log improvement + tracer.log_trace( + iteration=1, + parent_program=self.parent_program, + child_program=self.child_program + ) + + # Log decline + tracer.log_trace( + iteration=2, + parent_program=self.child_program, + child_program=self.parent_program + ) + + stats = tracer.get_statistics() + + self.assertEqual(stats["total_traces"], 2) + self.assertIn("total_improvement", stats) + self.assertIn("best_improvement", stats) + self.assertIn("worst_decline", stats) + self.assertIn("improvement_rate", stats) + + # Check improvement tracking + self.assertAlmostEqual(stats["best_improvement"]["score"], 0.1) + self.assertAlmostEqual(stats["worst_decline"]["score"], -0.1) + + tracer.close() + + def test_buffer_flushing(self): + """Test buffer flushing behavior""" + output_path = Path(self.temp_dir) / "trace.jsonl" + + tracer = EvolutionTracer( + output_path=str(output_path), + format="jsonl", + buffer_size=2 + ) + + # First trace - should not write yet + tracer.log_trace( + iteration=1, + parent_program=self.parent_program, + child_program=self.child_program + ) + + # File should not exist yet + self.assertFalse(output_path.exists()) + + # Second trace - should trigger flush + tracer.log_trace( + iteration=2, + parent_program=self.parent_program, + child_program=self.child_program + ) + + # File should now exist + self.assertTrue(output_path.exists()) + + # Verify two lines written + with open(output_path, "r") as f: + lines = f.readlines() + self.assertEqual(len(lines), 2) + + tracer.close() + + def test_context_manager(self): + """Test using tracer as context manager""" + output_path = Path(self.temp_dir) / "trace.jsonl" + + with EvolutionTracer( + output_path=str(output_path), + buffer_size=10 + ) as tracer: + tracer.log_trace( + iteration=1, + parent_program=self.parent_program, + child_program=self.child_program + ) + + # File should exist after context exit (close called) + self.assertTrue(output_path.exists()) + + def test_artifact_logging(self): + """Test logging with artifacts""" + output_path = Path(self.temp_dir) / "trace.jsonl" + + tracer = EvolutionTracer( + output_path=str(output_path), + include_code=False, + buffer_size=1 + ) + + artifacts = { + "execution_output": "Result: 42", + "error": None, + "timing": 0.123 + } + + tracer.log_trace( + iteration=1, + parent_program=self.parent_program, + child_program=self.child_program, + artifacts=artifacts + ) + + # Read and verify artifacts are included + with open(output_path, "r") as f: + data = json.loads(f.readline()) + + self.assertIn("artifacts", data) + self.assertEqual(data["artifacts"]["execution_output"], "Result: 42") + self.assertEqual(data["artifacts"]["timing"], 0.123) + + tracer.close() + + +class TestEvolutionTraceIntegration(unittest.TestCase): + """Test integration with OpenEvolve configuration""" + + def test_config_integration(self): + """Test that evolution trace config is properly integrated""" + from openevolve.config import Config, EvolutionTraceConfig + + # Create config with evolution trace enabled + config = Config() + config.evolution_trace = EvolutionTraceConfig( + enabled=True, + format="jsonl", + include_code=True, + include_prompts=False, + output_path="/tmp/test_trace.jsonl" + ) + + # Verify configuration + self.assertTrue(config.evolution_trace.enabled) + self.assertEqual(config.evolution_trace.format, "jsonl") + self.assertTrue(config.evolution_trace.include_code) + self.assertFalse(config.evolution_trace.include_prompts) + + def test_yaml_config(self): + """Test loading evolution trace config from YAML""" + import yaml + from openevolve.config import Config + + yaml_content = """ + evolution_trace: + enabled: true + format: json + include_code: false + include_prompts: true + output_path: /tmp/evolution_trace.json + buffer_size: 20 + compress: false + """ + + config_dict = yaml.safe_load(yaml_content) + config = Config.from_dict(config_dict) + + self.assertTrue(config.evolution_trace.enabled) + self.assertEqual(config.evolution_trace.format, "json") + self.assertFalse(config.evolution_trace.include_code) + self.assertTrue(config.evolution_trace.include_prompts) + self.assertEqual(config.evolution_trace.buffer_size, 20) + + +class TestCheckpointExtraction(unittest.TestCase): + """Test checkpoint extraction functionality""" + + def setUp(self): + """Set up test fixtures""" + self.temp_dir = tempfile.mkdtemp() + self.checkpoint_dir = Path(self.temp_dir) / "checkpoint" + self.programs_dir = self.checkpoint_dir / "programs" + self.programs_dir.mkdir(parents=True, exist_ok=True) + + def tearDown(self): + """Clean up test fixtures""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _create_test_program(self, prog_id, parent_id=None, iteration=0, metrics=None, generation=0): + """Helper to create a test program JSON file""" + program_data = { + "id": prog_id, + "code": f"def func_{prog_id}(): pass", + "language": "python", + "parent_id": parent_id, + "iteration_found": iteration, + "generation": generation, + "timestamp": 1234567890.0 + iteration, + "metrics": metrics or {"score": 0.5, "accuracy": 0.8}, + "metadata": { + "island": iteration % 3, + "changes": f"Changes for {prog_id}" + } + } + + program_file = self.programs_dir / f"{prog_id}.json" + with open(program_file, "w") as f: + json.dump(program_data, f) + + return program_data + + def test_extract_from_empty_checkpoint(self): + """Test extraction from checkpoint with no programs""" + traces = extract_evolution_trace_from_checkpoint(self.checkpoint_dir) + self.assertEqual(len(traces), 0) + + def test_extract_single_parent_child(self): + """Test extraction with single parent-child pair""" + # Create parent and child programs + parent = self._create_test_program("parent-1", None, 0, {"score": 0.5}, 0) + child = self._create_test_program("child-1", "parent-1", 1, {"score": 0.6}, 1) + + traces = extract_evolution_trace_from_checkpoint(self.checkpoint_dir) + + self.assertEqual(len(traces), 1) + trace = traces[0] + + self.assertEqual(trace.parent_id, "parent-1") + self.assertEqual(trace.child_id, "child-1") + self.assertEqual(trace.iteration, 1) + self.assertEqual(trace.generation, 1) + self.assertEqual(trace.parent_metrics["score"], 0.5) + self.assertEqual(trace.child_metrics["score"], 0.6) + + def test_extract_multiple_generations(self): + """Test extraction with multiple generations""" + # Create a chain of programs + prog1 = self._create_test_program("prog-1", None, 0, {"score": 0.5}, 0) + prog2 = self._create_test_program("prog-2", "prog-1", 1, {"score": 0.6}, 1) + prog3 = self._create_test_program("prog-3", "prog-2", 2, {"score": 0.7}, 2) + prog4 = self._create_test_program("prog-4", "prog-2", 3, {"score": 0.65}, 2) + + traces = extract_evolution_trace_from_checkpoint(self.checkpoint_dir) + + self.assertEqual(len(traces), 3) # Three parent-child pairs + + # Verify traces are sorted by iteration + iterations = [t.iteration for t in traces] + self.assertEqual(iterations, sorted(iterations)) + + def test_extract_with_code_inclusion(self): + """Test extraction with code included""" + parent = self._create_test_program("parent-1", None, 0) + child = self._create_test_program("child-1", "parent-1", 1) + + traces = extract_evolution_trace_from_checkpoint( + self.checkpoint_dir, + include_code=True + ) + + self.assertEqual(len(traces), 1) + trace = traces[0] + + self.assertIn("func_parent-1", trace.parent_code) + self.assertIn("func_child-1", trace.child_code) + + def test_extract_without_code(self): + """Test extraction without code""" + parent = self._create_test_program("parent-1", None, 0) + child = self._create_test_program("child-1", "parent-1", 1) + + traces = extract_evolution_trace_from_checkpoint( + self.checkpoint_dir, + include_code=False + ) + + trace = traces[0] + self.assertIsNone(trace.parent_code) + self.assertIsNone(trace.child_code) + + def test_save_extracted_traces_jsonl(self): + """Test saving extracted traces in JSONL format""" + parent = self._create_test_program("parent-1", None, 0) + child = self._create_test_program("child-1", "parent-1", 1) + + output_path = Path(self.temp_dir) / "extracted.jsonl" + + traces = extract_evolution_trace_from_checkpoint( + self.checkpoint_dir, + output_path=str(output_path), + format="jsonl" + ) + + # Verify file was created + self.assertTrue(output_path.exists()) + + # Read and verify content + with open(output_path, "r") as f: + lines = f.readlines() + + self.assertEqual(len(lines), 1) + data = json.loads(lines[0]) + self.assertEqual(data["parent_id"], "parent-1") + self.assertEqual(data["child_id"], "child-1") + + def test_save_extracted_traces_json(self): + """Test saving extracted traces in JSON format""" + parent = self._create_test_program("parent-1", None, 0) + child = self._create_test_program("child-1", "parent-1", 1) + + output_path = Path(self.temp_dir) / "extracted.json" + + traces = extract_evolution_trace_from_checkpoint( + self.checkpoint_dir, + output_path=str(output_path), + format="json" + ) + + # Verify file was created + self.assertTrue(output_path.exists()) + + # Read and verify content + with open(output_path, "r") as f: + data = json.load(f) + + self.assertIn("metadata", data) + self.assertIn("traces", data) + self.assertEqual(len(data["traces"]), 1) + self.assertEqual(data["traces"][0]["parent_id"], "parent-1") + + def test_extract_full_lineage(self): + """Test full lineage extraction""" + # Create a chain of programs with prompts + prog1 = self._create_test_program("prog-1", None, 0, {"score": 0.5}, 0) + prog2_data = self._create_test_program("prog-2", "prog-1", 1, {"score": 0.6}, 1) + prog3_data = self._create_test_program("prog-3", "prog-2", 2, {"score": 0.7}, 2) + + # Add prompts to the child programs + prog2_file = self.programs_dir / "prog-2.json" + prog2_data["prompts"] = { + "diff_user": { + "system": "System prompt for prog-2", + "user": "User prompt for prog-2", + "responses": ["LLM response for prog-2"] + } + } + with open(prog2_file, "w") as f: + json.dump(prog2_data, f) + + prog3_file = self.programs_dir / "prog-3.json" + prog3_data["prompts"] = { + "full_rewrite_user": { + "system": "System prompt for prog-3", + "user": "User prompt for prog-3", + "responses": ["LLM response for prog-3"] + } + } + with open(prog3_file, "w") as f: + json.dump(prog3_data, f) + + # Extract full lineage traces + traces = extract_full_lineage_traces(self.checkpoint_dir) + + # Should have one trace (for prog-3, which has the longest lineage) + self.assertGreaterEqual(len(traces), 1) + + # Find the trace for prog-3 + prog3_trace = None + for trace in traces: + if trace["final_program_id"] == "prog-3": + prog3_trace = trace + break + + self.assertIsNotNone(prog3_trace) + self.assertEqual(prog3_trace["generation_depth"], 3) + self.assertEqual(len(prog3_trace["improvement_steps"]), 2) + + # Check first improvement step (prog-1 to prog-2) + step1 = prog3_trace["improvement_steps"][0] + self.assertEqual(step1["parent_id"], "prog-1") + self.assertEqual(step1["child_id"], "prog-2") + self.assertIsNotNone(step1["action"]) + self.assertEqual(step1["action"]["template"], "diff_user") + self.assertIn("LLM response for prog-2", step1["action"]["llm_response"]) + + # Check improvement calculation + self.assertIn("score", step1["improvement"]) + self.assertAlmostEqual(step1["improvement"]["score"], 0.1) + + def test_extract_full_lineage_json_output(self): + """Test full lineage extraction with JSON output""" + # Create simple lineage + self._create_test_program("parent", None, 0, {"score": 0.5}, 0) + child_data = self._create_test_program("child", "parent", 1, {"score": 0.6}, 1) + + # Add prompts + child_file = self.programs_dir / "child.json" + child_data["prompts"] = { + "test_template": { + "system": "Test system", + "user": "Test user", + "responses": ["Test response"] + } + } + with open(child_file, "w") as f: + json.dump(child_data, f) + + output_path = Path(self.temp_dir) / "lineage.json" + + traces = extract_full_lineage_traces( + self.checkpoint_dir, + output_path=str(output_path), + format="json" + ) + + # Verify output file was created + self.assertTrue(output_path.exists()) + + # Load and verify content + with open(output_path, "r") as f: + data = json.load(f) + + self.assertIn("metadata", data) + self.assertIn("traces", data) + self.assertEqual(data["metadata"]["type"], "full_lineage") + + def test_extract_with_missing_parent(self): + """Test extraction handles missing parents gracefully""" + # Create child without parent in checkpoint + child = self._create_test_program("child-1", "missing-parent", 1) + + traces = extract_evolution_trace_from_checkpoint(self.checkpoint_dir) + + # Should skip this program since parent is missing + self.assertEqual(len(traces), 0) + + def test_extract_with_corrupted_file(self): + """Test extraction handles corrupted files gracefully""" + # Create a valid program + parent = self._create_test_program("parent-1", None, 0) + child = self._create_test_program("child-1", "parent-1", 1) + + # Create a corrupted file + corrupted_file = self.programs_dir / "corrupted.json" + with open(corrupted_file, "w") as f: + f.write("not valid json {") + + # Should still extract valid programs + traces = extract_evolution_trace_from_checkpoint(self.checkpoint_dir) + self.assertEqual(len(traces), 1) + + def test_checkpoint_not_found(self): + """Test error handling for non-existent checkpoint""" + with self.assertRaises(FileNotFoundError): + extract_evolution_trace_from_checkpoint("/nonexistent/path") + + def test_programs_dir_not_found(self): + """Test error handling when programs directory is missing""" + # Create checkpoint dir without programs subdirectory + empty_checkpoint = Path(self.temp_dir) / "empty_checkpoint" + empty_checkpoint.mkdir(parents=True, exist_ok=True) + + with self.assertRaises(FileNotFoundError): + extract_evolution_trace_from_checkpoint(empty_checkpoint) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file