diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml index d60923a0f..d6ff498fa 100644 --- a/.github/workflows/python-test.yml +++ b/.github/workflows/python-test.yml @@ -18,8 +18,12 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -e ".[dev]" + pip install -e . + # Install test dependencies + pip install pytest numpy - name: Run unit tests + env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} run: | - python -m unittest discover -s tests -p "*.py" \ No newline at end of file + python -m unittest discover -s tests -p "test_*.py" -v \ No newline at end of file diff --git a/README.md b/README.md index 90431d6aa..d94ddd528 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,18 @@ See the [Configuration Guide](configs/default_config.yaml) for a full list of op See the `examples/` directory for complete examples of using OpenEvolve on various problems: +### Symbolic Regression + +A comprehensive example demonstrating OpenEvolve's application to symbolic regression tasks using the LLM-SRBench benchmark. This example shows how OpenEvolve can evolve simple mathematical expressions (like linear models) into complex symbolic formulas that accurately fit scientific datasets. + +[Explore the Symbolic Regression Example](examples/symbolic_regression/) + +Key features: +- Automatic generation of initial programs from benchmark tasks +- Evolution from simple linear models to complex mathematical expressions +- Evaluation on physics, chemistry, biology, and material science datasets +- Competitive results compared to state-of-the-art symbolic regression methods + ### Circle Packing Our implementation of the circle packing problem from the AlphaEvolve paper. For the n=26 case, where one needs to pack 26 circles in a unit square we also obtain SOTA results. diff --git a/openevolve/controller.py b/openevolve/controller.py index 5594d90f5..d090efae1 100644 --- a/openevolve/controller.py +++ b/openevolve/controller.py @@ -1,455 +1,471 @@ -""" -Main controller for OpenEvolve -""" - -import asyncio -import logging -import os -import re -import time -import uuid -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union - -from openevolve.config import Config, load_config -from openevolve.database import Program, ProgramDatabase -from openevolve.evaluator import Evaluator -from openevolve.llm.ensemble import LLMEnsemble -from openevolve.prompt.sampler import PromptSampler -from openevolve.utils.code_utils import ( - apply_diff, - extract_code_language, - extract_diffs, - format_diff_summary, - parse_evolve_blocks, - parse_full_rewrite, -) - -logger = logging.getLogger(__name__) - - -class OpenEvolve: - """ - Main controller for OpenEvolve - - Orchestrates the evolution process, coordinating between the prompt sampler, - LLM ensemble, evaluator, and program database. - - Features: - - Tracks the absolute best program across evolution steps - - Ensures the best solution is not lost during the MAP-Elites process - - Always includes the best program in the selection process for inspiration - - Maintains detailed logs and metadata about improvements - """ - - def __init__( - self, - initial_program_path: str, - evaluation_file: str, - config_path: Optional[str] = None, - config: Optional[Config] = None, - output_dir: Optional[str] = None, - ): - # Load configuration - if config is not None: - # Use provided Config object directly - self.config = config - else: - # Load from file or use defaults - self.config = load_config(config_path) - - # Set up output directory - self.output_dir = output_dir or os.path.join( - os.path.dirname(initial_program_path), "openevolve_output" - ) - os.makedirs(self.output_dir, exist_ok=True) - - # Set up logging - self._setup_logging() - - # Load initial program - self.initial_program_path = initial_program_path - self.initial_program_code = self._load_initial_program() - self.language = extract_code_language(self.initial_program_code) - - # Extract file extension from initial program - self.file_extension = os.path.splitext(initial_program_path)[1] - if not self.file_extension: - # Default to .py if no extension found - self.file_extension = ".py" - else: - # Make sure it starts with a dot - if not self.file_extension.startswith("."): - self.file_extension = f".{self.file_extension}" - - # Initialize components - self.llm_ensemble = LLMEnsemble(self.config.llm) - self.prompt_sampler = PromptSampler(self.config.prompt) - self.database = ProgramDatabase(self.config.database) - self.evaluator = Evaluator(self.config.evaluator, evaluation_file, self.llm_ensemble) - - logger.info(f"Initialized OpenEvolve with {initial_program_path} " f"and {evaluation_file}") - - def _setup_logging(self) -> None: - """Set up logging""" - log_dir = self.config.log_dir or os.path.join(self.output_dir, "logs") - os.makedirs(log_dir, exist_ok=True) - - # Set up root logger - root_logger = logging.getLogger() - root_logger.setLevel(getattr(logging, self.config.log_level)) - - # Add file handler - log_file = os.path.join(log_dir, f"openevolve_{time.strftime('%Y%m%d_%H%M%S')}.log") - file_handler = logging.FileHandler(log_file) - file_handler.setFormatter( - logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - ) - root_logger.addHandler(file_handler) - - # Add console handler - console_handler = logging.StreamHandler() - console_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) - root_logger.addHandler(console_handler) - - logger.info(f"Logging to {log_file}") - - def _load_initial_program(self) -> str: - """Load the initial program from file""" - with open(self.initial_program_path, "r") as f: - return f.read() - - async def run( - self, - iterations: Optional[int] = None, - target_score: Optional[float] = None, - ) -> Program: - """ - Run the evolution process - - Args: - iterations: Maximum number of iterations (uses config if None) - target_score: Target score to reach (continues until reached if specified) - - Returns: - Best program found - """ - max_iterations = iterations or self.config.max_iterations - - # Define start_iteration before creating the initial program - start_iteration = self.database.last_iteration - - # Initialize the database with the initial program - initial_program_id = str(uuid.uuid4()) - - # Evaluate the initial program - initial_metrics = await self.evaluator.evaluate_program( - self.initial_program_code, initial_program_id - ) - - initial_program = Program( - id=initial_program_id, - code=self.initial_program_code, - language=self.language, - metrics=initial_metrics, - iteration_found=start_iteration, - ) - - self.database.add(initial_program) - - # Main evolution loop - total_iterations = start_iteration + max_iterations - - logger.info( - f"Starting evolution from iteration {start_iteration} for {max_iterations} iterations (total: {total_iterations})" - ) - - for i in range(start_iteration, total_iterations): - iteration_start = time.time() - - # Sample parent and inspirations - parent, inspirations = self.database.sample() - - # Build prompt - prompt = self.prompt_sampler.build_prompt( - current_program=parent.code, - parent_program=parent.code, # We don't have the parent's code, use the same - program_metrics=parent.metrics, - previous_programs=[p.to_dict() for p in self.database.get_top_programs(3)], - top_programs=[p.to_dict() for p in inspirations], - language=self.language, - evolution_round=i, - allow_full_rewrite=self.config.allow_full_rewrites, - ) - - # Generate code modification - try: - llm_response = await self.llm_ensemble.generate_with_context( - system_message=prompt["system"], - messages=[{"role": "user", "content": prompt["user"]}], - ) - - # Parse the response - if self.config.diff_based_evolution: - diff_blocks = extract_diffs(llm_response) - - if not diff_blocks: - logger.warning(f"Iteration {i+1}: No valid diffs found in response") - continue - - # Apply the diffs - child_code = apply_diff(parent.code, llm_response) - changes_summary = format_diff_summary(diff_blocks) - else: - # Parse full rewrite - new_code = parse_full_rewrite(llm_response, self.language) - - if not new_code: - logger.warning(f"Iteration {i+1}: No valid code found in response") - continue - - child_code = new_code - changes_summary = "Full rewrite" - - # Check code length - if len(child_code) > self.config.max_code_length: - logger.warning( - f"Iteration {i+1}: Generated code exceeds maximum length " - f"({len(child_code)} > {self.config.max_code_length})" - ) - continue - - # Evaluate the child program - child_id = str(uuid.uuid4()) - child_metrics = await self.evaluator.evaluate_program(child_code, child_id) - - # Create a child program - child_program = Program( - id=child_id, - code=child_code, - language=self.language, - parent_id=parent.id, - generation=parent.generation + 1, - metrics=child_metrics, - metadata={ - "changes": changes_summary, - "parent_metrics": parent.metrics, - }, - ) - - # Add to database - self.database.add(child_program, iteration=i + 1) - - # Log progress - iteration_time = time.time() - iteration_start - self._log_iteration(i, parent, child_program, iteration_time) - - # Specifically check if this is the new best program - if self.database.best_program_id == child_program.id: - logger.info( - f"🌟 New best solution found at iteration {i+1}: {child_program.id}" - ) - logger.info( - f"Metrics: {', '.join(f'{name}={value:.4f}' for name, value in child_program.metrics.items())}" - ) - - # Save checkpoint - if (i + 1) % self.config.checkpoint_interval == 0: - self._save_checkpoint(i + 1) - - # Check if target score reached - if target_score is not None: - avg_score = sum(child_metrics.values()) / max(1, len(child_metrics)) - if avg_score >= target_score: - logger.info(f"Target score {target_score} reached after {i+1} iterations") - break - - except Exception as e: - logger.error(f"Error in iteration {i+1}: {str(e)}") - continue - - # Get the best program using our tracking mechanism - best_program = None - if self.database.best_program_id: - best_program = self.database.get(self.database.best_program_id) - logger.info(f"Using tracked best program: {self.database.best_program_id}") - - # Fallback to calculating best program if tracked program not found - if best_program is None: - best_program = self.database.get_best_program() - logger.info("Using calculated best program (tracked program not found)") - - # Check if there's a better program by combined_score that wasn't tracked - if "combined_score" in best_program.metrics: - best_by_combined = self.database.get_best_program(metric="combined_score") - if ( - best_by_combined - and best_by_combined.id != best_program.id - and "combined_score" in best_by_combined.metrics - ): - # If the combined_score of this program is significantly better, use it instead - if ( - best_by_combined.metrics["combined_score"] - > best_program.metrics["combined_score"] + 0.02 - ): - logger.warning( - f"Found program with better combined_score: {best_by_combined.id}" - ) - logger.warning( - f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}" - ) - best_program = best_by_combined - - if best_program: - logger.info( - f"Evolution complete. Best program has metrics: " - f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" - ) - - # Save the best program (using our tracked best program) - self._save_best_program() - - return best_program - else: - logger.warning("No valid programs found during evolution") - return initial_program - - def _log_iteration( - self, - iteration: int, - parent: Program, - child: Program, - elapsed_time: float, - ) -> None: - """ - Log iteration progress - - Args: - iteration: Iteration number - parent: Parent program - child: Child program - elapsed_time: Elapsed time in seconds - """ - # Calculate improvement - improvement = {} - for metric, value in child.metrics.items(): - if metric in parent.metrics: - diff = value - parent.metrics[metric] - improvement[metric] = diff - - improvement_str = ", ".join(f"{name}={diff:+.4f}" for name, diff in improvement.items()) - - logger.info( - f"Iteration {iteration+1}: Child {child.id} from parent {parent.id} " - f"in {elapsed_time:.2f}s. Metrics: " - f"{', '.join(f'{name}={value:.4f}' for name, value in child.metrics.items())} " - f"(Δ: {improvement_str})" - ) - - def _save_checkpoint(self, iteration: int) -> None: - """ - Save a checkpoint - - Args: - iteration: Current iteration number - """ - checkpoint_dir = os.path.join(self.output_dir, "checkpoints") - os.makedirs(checkpoint_dir, exist_ok=True) - - # Create specific checkpoint directory - checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_{iteration}") - os.makedirs(checkpoint_path, exist_ok=True) - - # Save the database - self.database.save(checkpoint_path, iteration) - - # Save the best program found so far - best_program = None - if self.database.best_program_id: - best_program = self.database.get(self.database.best_program_id) - else: - best_program = self.database.get_best_program() - - if best_program: - # Save the best program at this checkpoint - best_program_path = os.path.join(checkpoint_path, f"best_program{self.file_extension}") - with open(best_program_path, "w") as f: - f.write(best_program.code) - - # Save metrics - best_program_info_path = os.path.join(checkpoint_path, "best_program_info.json") - with open(best_program_info_path, "w") as f: - import json - - json.dump( - { - "id": best_program.id, - "generation": best_program.generation, - "iteration": best_program.iteration_found, - "current_iteration": iteration, - "metrics": best_program.metrics, - "language": best_program.language, - "timestamp": best_program.timestamp, - "saved_at": time.time(), - }, - f, - indent=2, - ) - - logger.info( - f"Saved best program at checkpoint {iteration} with metrics: " - f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" - ) - - logger.info(f"Saved checkpoint at iteration {iteration} to {checkpoint_path}") - - def _save_best_program(self, program: Optional[Program] = None) -> None: - """ - Save the best program - - Args: - program: Best program (if None, uses the tracked best program) - """ - # If no program is provided, use the tracked best program from the database - if program is None: - if self.database.best_program_id: - program = self.database.get(self.database.best_program_id) - else: - # Fallback to calculating best program if no tracked best program - program = self.database.get_best_program() - - if not program: - logger.warning("No best program found to save") - return - - best_dir = os.path.join(self.output_dir, "best") - os.makedirs(best_dir, exist_ok=True) - - # Use the extension from the initial program file - filename = f"best_program{self.file_extension}" - code_path = os.path.join(best_dir, filename) - - with open(code_path, "w") as f: - f.write(program.code) - - # Save complete program info including metrics - info_path = os.path.join(best_dir, "best_program_info.json") - with open(info_path, "w") as f: - import json - - json.dump( - { - "id": program.id, - "generation": program.generation, - "iteration": program.iteration_found, - "timestamp": program.timestamp, - "parent_id": program.parent_id, - "metrics": program.metrics, - "language": program.language, - "saved_at": time.time(), - }, - f, - indent=2, - ) - - logger.info(f"Saved best program to {code_path} with program info to {info_path}") +""" +Main controller for OpenEvolve +""" + +import asyncio +import logging +import os +import re +import time +import uuid +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +from openevolve.config import Config, load_config +from openevolve.database import Program, ProgramDatabase +from openevolve.evaluator import Evaluator +from openevolve.llm.ensemble import LLMEnsemble +from openevolve.prompt.sampler import PromptSampler +from openevolve.utils.code_utils import ( + apply_diff, + extract_code_language, + extract_diffs, + format_diff_summary, + parse_evolve_blocks, + parse_full_rewrite, +) + +logger = logging.getLogger(__name__) + + +class OpenEvolve: + """ + Main controller for OpenEvolve + + Orchestrates the evolution process, coordinating between the prompt sampler, + LLM ensemble, evaluator, and program database. + + Features: + - Tracks the absolute best program across evolution steps + - Ensures the best solution is not lost during the MAP-Elites process + - Always includes the best program in the selection process for inspiration + - Maintains detailed logs and metadata about improvements + """ + + def __init__( + self, + initial_program_path: str, + evaluation_file: str, + config_path: Optional[str] = None, + config: Optional[Config] = None, + output_dir: Optional[str] = None, + ): + # Load configuration + if config is not None: + # Use provided Config object directly + self.config = config + else: + # Load from file or use defaults + self.config = load_config(config_path) + + # Set up output directory + self.output_dir = output_dir or os.path.join( + os.path.dirname(initial_program_path), "openevolve_output" + ) + os.makedirs(self.output_dir, exist_ok=True) + + # Set up logging + self._setup_logging() + + # Load initial program + self.initial_program_path = initial_program_path + self.initial_program_code = self._load_initial_program() + self.language = extract_code_language(self.initial_program_code) + + # Extract file extension from initial program + self.file_extension = os.path.splitext(initial_program_path)[1] + if not self.file_extension: + # Default to .py if no extension found + self.file_extension = ".py" + else: + # Make sure it starts with a dot + if not self.file_extension.startswith("."): + self.file_extension = f".{self.file_extension}" + + # Initialize components + self.llm_ensemble = LLMEnsemble(self.config.llm) + self.prompt_sampler = PromptSampler(self.config.prompt) + self.database = ProgramDatabase(self.config.database) + self.evaluator = Evaluator(self.config.evaluator, evaluation_file, self.llm_ensemble) + + logger.info(f"Initialized OpenEvolve with {initial_program_path} " f"and {evaluation_file}") + + def _setup_logging(self) -> None: + """Set up logging""" + log_dir = self.config.log_dir or os.path.join(self.output_dir, "logs") + os.makedirs(log_dir, exist_ok=True) + + # Set up root logger + root_logger = logging.getLogger() + root_logger.setLevel(getattr(logging, self.config.log_level)) + + # Add file handler + log_file = os.path.join(log_dir, f"openevolve_{time.strftime('%Y%m%d_%H%M%S')}.log") + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter( + logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + ) + root_logger.addHandler(file_handler) + + # Add console handler + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) + root_logger.addHandler(console_handler) + + logger.info(f"Logging to {log_file}") + + def _load_initial_program(self) -> str: + """Load the initial program from file""" + with open(self.initial_program_path, "r") as f: + return f.read() + + async def run( + self, + iterations: Optional[int] = None, + target_score: Optional[float] = None, + ) -> Program: + """ + Run the evolution process + + Args: + iterations: Maximum number of iterations (uses config if None) + target_score: Target score to reach (continues until reached if specified) + + Returns: + Best program found + """ + max_iterations = iterations or self.config.max_iterations + + # Define start_iteration before creating the initial program + start_iteration = self.database.last_iteration + + # Only add initial program if starting fresh (not resuming from checkpoint) + # Check if we're resuming AND no program matches initial code to avoid pollution + should_add_initial = ( + start_iteration == 0 + and len(self.database.programs) == 0 + and not any( + p.code == self.initial_program_code for p in self.database.programs.values() + ) + ) + + if should_add_initial: + logger.info("Adding initial program to database") + initial_program_id = str(uuid.uuid4()) + + # Evaluate the initial program + initial_metrics = await self.evaluator.evaluate_program( + self.initial_program_code, initial_program_id + ) + + initial_program = Program( + id=initial_program_id, + code=self.initial_program_code, + language=self.language, + metrics=initial_metrics, + iteration_found=start_iteration, + ) + + self.database.add(initial_program) + else: + logger.info( + f"Skipping initial program addition (resuming from iteration {start_iteration} with {len(self.database.programs)} existing programs)" + ) + + # Main evolution loop + total_iterations = start_iteration + max_iterations + + logger.info( + f"Starting evolution from iteration {start_iteration} for {max_iterations} iterations (total: {total_iterations})" + ) + + for i in range(start_iteration, total_iterations): + iteration_start = time.time() + + # Sample parent and inspirations + parent, inspirations = self.database.sample() + + # Build prompt + prompt = self.prompt_sampler.build_prompt( + current_program=parent.code, + parent_program=parent.code, # We don't have the parent's code, use the same + program_metrics=parent.metrics, + previous_programs=[p.to_dict() for p in self.database.get_top_programs(3)], + top_programs=[p.to_dict() for p in inspirations], + language=self.language, + evolution_round=i, + allow_full_rewrite=self.config.allow_full_rewrites, + ) + + # Generate code modification + try: + llm_response = await self.llm_ensemble.generate_with_context( + system_message=prompt["system"], + messages=[{"role": "user", "content": prompt["user"]}], + ) + + # Parse the response + if self.config.diff_based_evolution: + diff_blocks = extract_diffs(llm_response) + + if not diff_blocks: + logger.warning(f"Iteration {i+1}: No valid diffs found in response") + continue + + # Apply the diffs + child_code = apply_diff(parent.code, llm_response) + changes_summary = format_diff_summary(diff_blocks) + else: + # Parse full rewrite + new_code = parse_full_rewrite(llm_response, self.language) + + if not new_code: + logger.warning(f"Iteration {i+1}: No valid code found in response") + continue + + child_code = new_code + changes_summary = "Full rewrite" + + # Check code length + if len(child_code) > self.config.max_code_length: + logger.warning( + f"Iteration {i+1}: Generated code exceeds maximum length " + f"({len(child_code)} > {self.config.max_code_length})" + ) + continue + + # Evaluate the child program + child_id = str(uuid.uuid4()) + child_metrics = await self.evaluator.evaluate_program(child_code, child_id) + + # Create a child program + child_program = Program( + id=child_id, + code=child_code, + language=self.language, + parent_id=parent.id, + generation=parent.generation + 1, + metrics=child_metrics, + metadata={ + "changes": changes_summary, + "parent_metrics": parent.metrics, + }, + ) + + # Add to database + self.database.add(child_program, iteration=i + 1) + + # Log progress + iteration_time = time.time() - iteration_start + self._log_iteration(i, parent, child_program, iteration_time) + + # Specifically check if this is the new best program + if self.database.best_program_id == child_program.id: + logger.info( + f"🌟 New best solution found at iteration {i+1}: {child_program.id}" + ) + logger.info( + f"Metrics: {', '.join(f'{name}={value:.4f}' for name, value in child_program.metrics.items())}" + ) + + # Save checkpoint + if (i + 1) % self.config.checkpoint_interval == 0: + self._save_checkpoint(i + 1) + + # Check if target score reached + if target_score is not None: + avg_score = sum(child_metrics.values()) / max(1, len(child_metrics)) + if avg_score >= target_score: + logger.info(f"Target score {target_score} reached after {i+1} iterations") + break + + except Exception as e: + logger.error(f"Error in iteration {i+1}: {str(e)}") + continue + + # Get the best program using our tracking mechanism + best_program = None + if self.database.best_program_id: + best_program = self.database.get(self.database.best_program_id) + logger.info(f"Using tracked best program: {self.database.best_program_id}") + + # Fallback to calculating best program if tracked program not found + if best_program is None: + best_program = self.database.get_best_program() + logger.info("Using calculated best program (tracked program not found)") + + # Check if there's a better program by combined_score that wasn't tracked + if "combined_score" in best_program.metrics: + best_by_combined = self.database.get_best_program(metric="combined_score") + if ( + best_by_combined + and best_by_combined.id != best_program.id + and "combined_score" in best_by_combined.metrics + ): + # If the combined_score of this program is significantly better, use it instead + if ( + best_by_combined.metrics["combined_score"] + > best_program.metrics["combined_score"] + 0.02 + ): + logger.warning( + f"Found program with better combined_score: {best_by_combined.id}" + ) + logger.warning( + f"Score difference: {best_program.metrics['combined_score']:.4f} vs {best_by_combined.metrics['combined_score']:.4f}" + ) + best_program = best_by_combined + + if best_program: + logger.info( + f"Evolution complete. Best program has metrics: " + f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" + ) + + # Save the best program (using our tracked best program) + self._save_best_program() + + return best_program + else: + logger.warning("No valid programs found during evolution") + # Return None if no programs found instead of undefined initial_program + return None + + def _log_iteration( + self, + iteration: int, + parent: Program, + child: Program, + elapsed_time: float, + ) -> None: + """ + Log iteration progress + + Args: + iteration: Iteration number + parent: Parent program + child: Child program + elapsed_time: Elapsed time in seconds + """ + # Calculate improvement + improvement = {} + for metric, value in child.metrics.items(): + if metric in parent.metrics: + diff = value - parent.metrics[metric] + improvement[metric] = diff + + improvement_str = ", ".join(f"{name}={diff:+.4f}" for name, diff in improvement.items()) + + logger.info( + f"Iteration {iteration+1}: Child {child.id} from parent {parent.id} " + f"in {elapsed_time:.2f}s. Metrics: " + f"{', '.join(f'{name}={value:.4f}' for name, value in child.metrics.items())} " + f"(Δ: {improvement_str})" + ) + + def _save_checkpoint(self, iteration: int) -> None: + """ + Save a checkpoint + + Args: + iteration: Current iteration number + """ + checkpoint_dir = os.path.join(self.output_dir, "checkpoints") + os.makedirs(checkpoint_dir, exist_ok=True) + + # Create specific checkpoint directory + checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_{iteration}") + os.makedirs(checkpoint_path, exist_ok=True) + + # Save the database + self.database.save(checkpoint_path, iteration) + + # Save the best program found so far + best_program = None + if self.database.best_program_id: + best_program = self.database.get(self.database.best_program_id) + else: + best_program = self.database.get_best_program() + + if best_program: + # Save the best program at this checkpoint + best_program_path = os.path.join(checkpoint_path, f"best_program{self.file_extension}") + with open(best_program_path, "w") as f: + f.write(best_program.code) + + # Save metrics + best_program_info_path = os.path.join(checkpoint_path, "best_program_info.json") + with open(best_program_info_path, "w") as f: + import json + + json.dump( + { + "id": best_program.id, + "generation": best_program.generation, + "iteration": best_program.iteration_found, + "current_iteration": iteration, + "metrics": best_program.metrics, + "language": best_program.language, + "timestamp": best_program.timestamp, + "saved_at": time.time(), + }, + f, + indent=2, + ) + + logger.info( + f"Saved best program at checkpoint {iteration} with metrics: " + f"{', '.join(f'{name}={value:.4f}' for name, value in best_program.metrics.items())}" + ) + + logger.info(f"Saved checkpoint at iteration {iteration} to {checkpoint_path}") + + def _save_best_program(self, program: Optional[Program] = None) -> None: + """ + Save the best program + + Args: + program: Best program (if None, uses the tracked best program) + """ + # If no program is provided, use the tracked best program from the database + if program is None: + if self.database.best_program_id: + program = self.database.get(self.database.best_program_id) + else: + # Fallback to calculating best program if no tracked best program + program = self.database.get_best_program() + + if not program: + logger.warning("No best program found to save") + return + + best_dir = os.path.join(self.output_dir, "best") + os.makedirs(best_dir, exist_ok=True) + + # Use the extension from the initial program file + filename = f"best_program{self.file_extension}" + code_path = os.path.join(best_dir, filename) + + with open(code_path, "w") as f: + f.write(program.code) + + # Save complete program info including metrics + info_path = os.path.join(best_dir, "best_program_info.json") + with open(info_path, "w") as f: + import json + + json.dump( + { + "id": program.id, + "generation": program.generation, + "iteration": program.iteration_found, + "timestamp": program.timestamp, + "parent_id": program.parent_id, + "metrics": program.metrics, + "language": program.language, + "saved_at": time.time(), + }, + f, + indent=2, + ) + + logger.info(f"Saved best program to {code_path} with program info to {info_path}") diff --git a/tests/test_checkpoint_resume.py b/tests/test_checkpoint_resume.py new file mode 100644 index 000000000..08baaf956 --- /dev/null +++ b/tests/test_checkpoint_resume.py @@ -0,0 +1,304 @@ +""" +Tests for checkpoint resume functionality and initial program deduplication +""" + +import asyncio +import os +import tempfile +import unittest +from unittest.mock import AsyncMock, MagicMock, patch +import json +import time + +# Set dummy API key for testing to prevent OpenAI SDK import failures +os.environ["OPENAI_API_KEY"] = "test" + +from openevolve.config import Config +from openevolve.controller import OpenEvolve +from openevolve.database import Program, ProgramDatabase + + +class MockEvaluator: + """Mock evaluator for testing""" + + def __init__(self): + self.call_count = 0 + + async def evaluate_program(self, code, program_id): + """Mock evaluation that returns predictable metrics""" + self.call_count += 1 + # Return slightly different metrics each time to simulate real evaluation + return { + "score": 0.5 + (self.call_count * 0.1) % 0.5, + "combined_score": 0.6 + (self.call_count * 0.05) % 0.4, + } + + +class TestCheckpointResume(unittest.TestCase): + """Tests for checkpoint resume functionality""" + + def setUp(self): + """Set up test environment""" + self.test_dir = tempfile.mkdtemp() + + # Create a simple test program + self.test_program_content = """# EVOLVE-BLOCK-START +def test_function(): + return "test" +# EVOLVE-BLOCK-END + +def main(): + return test_function() +""" + + self.test_program_path = os.path.join(self.test_dir, "test_program.py") + with open(self.test_program_path, "w") as f: + f.write(self.test_program_content) + + # Create a simple evaluator file + self.evaluator_content = """ +def evaluate(program_path): + return {"score": 0.5, "combined_score": 0.6} +""" + + self.evaluator_path = os.path.join(self.test_dir, "evaluator.py") + with open(self.evaluator_path, "w") as f: + f.write(self.evaluator_content) + + # Create test config + self.config = Config() + self.config.max_iterations = 2 # Keep tests fast + self.config.checkpoint_interval = 1 + self.config.database.in_memory = True + + def tearDown(self): + """Clean up test environment""" + import shutil + + shutil.rmtree(self.test_dir, ignore_errors=True) + + def test_fresh_start_adds_initial_program(self): + """Test that initial program is added when starting fresh""" + + async def run_test(): + with patch("openevolve.controller.Evaluator") as mock_evaluator_class: + mock_evaluator = MockEvaluator() + mock_evaluator_class.return_value = mock_evaluator + + controller = OpenEvolve( + initial_program_path=self.test_program_path, + evaluation_file=self.evaluator_path, + config=self.config, + output_dir=self.test_dir, + ) + + # Verify database is empty initially + self.assertEqual(len(controller.database.programs), 0) + self.assertEqual(controller.database.last_iteration, 0) + + # Mock the LLM to avoid actual API calls + with patch.object(controller.llm_ensemble, "generate_with_context") as mock_llm: + mock_llm.return_value = "No changes needed" + + # Run for 0 iterations (just initialization) + result = await controller.run(iterations=0) + + # Verify initial program was added + self.assertEqual(len(controller.database.programs), 1) + + # Verify the initial program has the correct content + programs = list(controller.database.programs.values()) + initial_program = programs[0] + self.assertEqual(initial_program.code, self.test_program_content) + self.assertEqual(initial_program.iteration_found, 0) + + # Verify evaluator was called exactly once for initial program + self.assertEqual(mock_evaluator.call_count, 1) + + # Run the async test + asyncio.run(run_test()) + + def test_duplicate_content_prevention(self): + """Test that programs with identical content are not added multiple times""" + + async def run_test(): + with patch("openevolve.controller.Evaluator") as mock_evaluator_class: + mock_evaluator = MockEvaluator() + mock_evaluator_class.return_value = mock_evaluator + + controller = OpenEvolve( + initial_program_path=self.test_program_path, + evaluation_file=self.evaluator_path, + config=self.config, + output_dir=self.test_dir, + ) + + # Add a program with different ID but same content as initial program + existing_program = Program( + id="different_id", + code=self.test_program_content, # Same content as initial program + language="python", + metrics={"score": 0.7, "combined_score": 0.8}, + iteration_found=0, + ) + + controller.database.add(existing_program) + + # Mock the LLM to avoid actual API calls + with patch.object(controller.llm_ensemble, "generate_with_context") as mock_llm: + mock_llm.return_value = "No changes needed" + + # Run for 0 iterations (just initialization) + result = await controller.run(iterations=0) + + # Verify no additional program was added (still only 1 program) + self.assertEqual(len(controller.database.programs), 1) + + # Verify the existing program is still there + self.assertIn("different_id", controller.database.programs) + + # Verify evaluator was not called for initial program + self.assertEqual(mock_evaluator.call_count, 0) + + def test_checkpoint_resume_skips_initial_program(self): + """Test that initial program is not re-added when resuming from checkpoint""" + + async def run_test(): + with patch("openevolve.controller.Evaluator") as mock_evaluator_class: + mock_evaluator = MockEvaluator() + mock_evaluator_class.return_value = mock_evaluator + + controller = OpenEvolve( + initial_program_path=self.test_program_path, + evaluation_file=self.evaluator_path, + config=self.config, + output_dir=self.test_dir, + ) + + # Simulate existing database state (as if loaded from checkpoint) + existing_program = Program( + id="existing_program_id", + code=self.test_program_content, # Same content as initial program + language="python", + metrics={"score": 0.7, "combined_score": 0.8}, + iteration_found=5, + ) + + controller.database.add(existing_program) + controller.database.last_iteration = 10 # Simulate resuming from iteration 10 + + # Verify database has the existing program + self.assertEqual(len(controller.database.programs), 1) + self.assertEqual(controller.database.last_iteration, 10) + + # Mock the LLM to avoid actual API calls + with patch.object(controller.llm_ensemble, "generate_with_context") as mock_llm: + mock_llm.return_value = "No changes needed" + + # Run for 0 iterations (just initialization) + result = await controller.run(iterations=0) + + # Verify no additional program was added (still only 1 program) + self.assertEqual(len(controller.database.programs), 1) + + # Verify the existing program is still there with original ID + self.assertIn("existing_program_id", controller.database.programs) + + # Verify evaluator was not called for initial program (count should be 0) + self.assertEqual(mock_evaluator.call_count, 0) + + # Run the async test + asyncio.run(run_test()) + + def test_non_empty_database_at_iteration_zero(self): + """Test that initial program is not added when database already has programs at iteration 0""" + + async def run_test(): + with patch("openevolve.controller.Evaluator") as mock_evaluator_class: + mock_evaluator = MockEvaluator() + mock_evaluator_class.return_value = mock_evaluator + + controller = OpenEvolve( + initial_program_path=self.test_program_path, + evaluation_file=self.evaluator_path, + config=self.config, + output_dir=self.test_dir, + ) + + # Add a program with different content to simulate pre-populated database + different_program = Program( + id="different_id", + code="def different_function(): pass", # Different content + language="python", + metrics={"score": 0.6, "combined_score": 0.7}, + iteration_found=0, + ) + + controller.database.add(different_program) + # Keep last_iteration at 0 to simulate fresh start with pre-populated DB + + # Verify database has the different program + self.assertEqual(len(controller.database.programs), 1) + self.assertEqual(controller.database.last_iteration, 0) + + # Mock the LLM to avoid actual API calls + with patch.object(controller.llm_ensemble, "generate_with_context") as mock_llm: + mock_llm.return_value = "No changes needed" + + # Run for 0 iterations (just initialization) + result = await controller.run(iterations=0) + + # Verify no additional program was added (still only 1 program) + self.assertEqual(len(controller.database.programs), 1) + + # Verify the existing program is still there + self.assertIn("different_id", controller.database.programs) + + # Verify evaluator was not called for initial program + self.assertEqual(mock_evaluator.call_count, 0) + + # Run the async test + asyncio.run(run_test()) + + def test_multiple_run_calls_no_pollution(self): + """Test that calling run() multiple times doesn't pollute the database""" + + async def run_test(): + with patch("openevolve.controller.Evaluator") as mock_evaluator_class: + mock_evaluator = MockEvaluator() + mock_evaluator_class.return_value = mock_evaluator + + controller = OpenEvolve( + initial_program_path=self.test_program_path, + evaluation_file=self.evaluator_path, + config=self.config, + output_dir=self.test_dir, + ) + + # Mock the LLM to avoid actual API calls + with patch.object(controller.llm_ensemble, "generate_with_context") as mock_llm: + mock_llm.return_value = "No changes needed" + + # Run first time + result1 = await controller.run(iterations=0) + initial_count = len(controller.database.programs) + evaluator_calls_after_first = mock_evaluator.call_count + + # Run second time (simulating resume or restart) + result2 = await controller.run(iterations=0) + + # Run third time + result3 = await controller.run(iterations=0) + + # Verify database size didn't grow + self.assertEqual(len(controller.database.programs), initial_count) + + # Verify evaluator was only called once (for the initial program in first run) + self.assertEqual(mock_evaluator.call_count, evaluator_calls_after_first) + + # Run the async test + asyncio.run(run_test()) + + +if __name__ == "__main__": + unittest.main()