diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index 1fb418f..d08bc0f 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -22,7 +22,7 @@ import random import time import uuid -from typing import Any, Dict, List, Literal, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Tuple, Union from ldai import AIAgentConfig, AIJudgeConfig, AIJudgeConfigDefault, LDAIClient from ldai.models import LDMessage, ModelConfig @@ -132,8 +132,30 @@ def _compute_validation_count(pool_size: int) -> int: # Cost gate: a candidate must cost at most this fraction of the baseline # (history[0].estimated_cost_usd) to pass when acceptance criteria imply a -# cost reduction goal. 0.80 means at least 20% cheaper than the baseline. -_COST_TOLERANCE = 0.80 +# cost reduction goal. 0.90 means at least 10% cheaper than the baseline. +_COST_TOLERANCE = 0.90 + +# Maximum number of history items retained in the standard (non-GT) optimizer. +# Since user inputs are randomly selected there is no "full pass" concept, so +# a small fixed window is sufficient context for variation generation. +_MAX_STANDARD_HISTORY_LENGTH = 5 + + +def _trim_history( + history: List["OptimizationContext"], max_len: int +) -> List["OptimizationContext"]: + """Trim history to at most max_len of the most recent items. + + The duration/cost baselines are captured explicitly in ``_baseline_duration_ms`` + and ``_baseline_cost_usd`` so the oldest entry no longer needs to be preserved. + + :param history: Current accumulated history list. + :param max_len: Maximum number of items to retain (must be >= 1). + :return: Trimmed history list, or the original list if already within limit. + """ + if len(history) <= max_len: + return history + return history[-max_len:] # Maps SDK status strings to the API status/activity values expected by # agent_optimization_result records. Defined at module level to avoid @@ -195,6 +217,23 @@ def _initialize_class_members_from_config( agent_config.model.name if agent_config.model else None ) self._history: List[OptimizationContext] = [] + # Explicit baseline captured from the first iteration ever appended to history. + # Stored separately so that history truncation can be a pure slice without + # having to preserve history[0] as an anchor. + self._baseline_duration_ms: Optional[float] = None + self._baseline_cost_usd: Optional[float] = None + + def _record_baseline(self, ctx: OptimizationContext) -> None: + """Capture duration/cost baseline from the first iteration appended to history. + + Called once per run (subsequent calls are no-ops once both values are set). + Storing these explicitly lets ``_trim_history`` use a simple tail slice without + needing to preserve ``history[0]`` as an anchor. + """ + if self._baseline_duration_ms is None and ctx.duration_ms is not None: + self._baseline_duration_ms = ctx.duration_ms + if self._baseline_cost_usd is None and ctx.estimated_cost_usd is not None: + self._baseline_cost_usd = ctx.estimated_cost_usd def _build_agent_config_for_context( self, ctx: OptimizationContext, skip_interpolation: bool = False @@ -260,6 +299,7 @@ def _create_optimization_context( user_input=user_input, history=tuple(flat_history), iteration=iteration, + accumulated_token_usage=self._total_token_usage if self._total_token_usage > 0 else None, ) @property @@ -755,11 +795,7 @@ async def _evaluate_acceptance_judge( {judge_key: optimization_judge} ) ): - baseline_ms = ( - self._history[0].duration_ms - if self._history and self._history[0].duration_ms is not None - else None - ) + baseline_ms = self._baseline_duration_ms instructions += ( f"\n\nThe acceptance criteria for this judge includes a latency/duration goal. " f"The agent's response took {agent_duration_ms:.0f}ms to generate. " @@ -785,11 +821,7 @@ async def _evaluate_acceptance_judge( agent_usage, _find_model_config(self._current_model or "", self._model_configs), ) - baseline_cost = ( - self._history[0].estimated_cost_usd - if self._history and self._history[0].estimated_cost_usd is not None - else None - ) + baseline_cost = self._baseline_cost_usd if current_cost is not None: instructions += ( f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. " @@ -1100,27 +1132,13 @@ async def _run_ground_truth_optimization( expected_response=sample.expected_response, ) self._accumulate_tokens(optimize_context) - if self._is_token_limit_exceeded(): - logger.error( - "[GT Attempt %d] -> Token limit exceeded on sample %d (total=%d)", - attempt, - i + 1, - self._total_token_usage, - ) - attempt_results.append(optimize_context) - self._last_run_succeeded = False - self._last_succeeded_context = None - self._safe_status_update("failure", optimize_context, linear_iter) - if self._options.on_failing_result: - try: - self._options.on_failing_result(optimize_context) - except Exception: - logger.exception( - "[GT Attempt %d] -> on_failing_result callback failed", attempt - ) - return attempt_results + optimize_context = dataclasses.replace( + optimize_context, accumulated_token_usage=self._total_token_usage + ) - # Per-sample pass/fail check + # Per-sample pass/fail check — evaluated before the token limit gate so + # that a sample which passed is not incorrectly stamped as FAILED simply + # because the budget was exhausted after its scores were computed. if self._options.on_turn is not None: try: sample_passed = self._options.on_turn(optimize_context) @@ -1134,15 +1152,8 @@ async def _run_ground_truth_optimization( else: sample_passed = self._evaluate_response(optimize_context) - if sample_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - sample_passed = self._evaluate_duration(optimize_context) - - if sample_passed and _acceptance_criteria_implies_cost_optimization( - self._options.judges - ): - sample_passed = self._evaluate_cost(optimize_context) + sample_passed, optimize_context = self._apply_duration_gate(sample_passed, optimize_context) + sample_passed, optimize_context = self._apply_cost_gate(sample_passed, optimize_context) if not sample_passed: logger.info( @@ -1163,6 +1174,10 @@ async def _run_ground_truth_optimization( attempt_results.append(optimize_context) + # Persist the completed sample so every API record gets its scores, + # generation tokens, and accumulated_total — not just the final one. + self._safe_status_update("turn completed", optimize_context, linear_iter) + if gt_options.on_sample_result is not None: try: gt_options.on_sample_result(optimize_context) @@ -1173,6 +1188,42 @@ async def _run_ground_truth_optimization( i + 1, ) + # Token limit check after pass/fail so the terminal status reflects + # whether the samples actually passed, not just that the budget was hit. + # Mark success only when every sample in this attempt was processed and + # all passed; stopping mid-batch is always a failure even if the + # partially-processed samples looked good. + if self._is_token_limit_exceeded(): + logger.error( + "[GT Attempt %d] -> Token limit exceeded on sample %d (total=%d)", + attempt, + i + 1, + self._total_token_usage, + ) + if all_passed and i == n - 1: + self._last_run_succeeded = True + self._last_succeeded_context = optimize_context + self._safe_status_update("success", optimize_context, linear_iter) + if self._options.on_passing_result: + try: + self._options.on_passing_result(optimize_context) + except Exception: + logger.exception( + "[GT Attempt %d] -> on_passing_result callback failed", attempt + ) + else: + self._last_run_succeeded = False + self._last_succeeded_context = None + self._safe_status_update("failure", optimize_context, linear_iter) + if self._options.on_failing_result: + try: + self._options.on_failing_result(optimize_context) + except Exception: + logger.exception( + "[GT Attempt %d] -> on_failing_result callback failed", attempt + ) + return attempt_results + last_ctx = attempt_results[-1] if all_passed: @@ -1212,8 +1263,12 @@ async def _run_ground_truth_optimization( return attempt_results # Append all N results to history so the variation generator has full context - # from all of the previous samples + # from all of the previous samples, then trim to one full attempt's worth so + # judge prompts don't grow unboundedly across many failed attempts. + if attempt_results: + self._record_baseline(attempt_results[0]) self._history.extend(attempt_results) + self._history = _trim_history(self._history, n) logger.info( "[GT Attempt %d] -> %d/%d samples failed — generating new variation", @@ -1393,6 +1448,7 @@ async def _generate_new_variation( optimize_for_cost = _acceptance_criteria_implies_cost_optimization( self._options.judges ) + quality_already_passing = self._all_judges_passing() instructions = build_new_variation_prompt( self._history, self._options.judges, @@ -1404,6 +1460,7 @@ async def _generate_new_variation( self._initial_instructions, optimize_for_duration=optimize_for_duration, optimize_for_cost=optimize_for_cost, + quality_already_passing=quality_already_passing, ) # Create a flat history list (without nested history) to avoid exponential growth @@ -1440,6 +1497,8 @@ async def _generate_new_variation( False, ) variation_response: OptimizationResponse = await await_if_needed(result) + if variation_response.usage is not None: + self._total_token_usage += variation_response.usage.total or 0 response_str = variation_response.output try: response_data = extract_json_from_response(response_str) @@ -1707,11 +1766,18 @@ def _persist_and_forward( if snapshot.duration_ms is not None: patch["generationLatency"] = int(snapshot.duration_ms) if snapshot.usage is not None: - patch["generationTokens"] = { + gen_tokens: Dict[str, Any] = { "total": snapshot.usage.total, "input": snapshot.usage.input, "output": snapshot.usage.output, } + if snapshot.accumulated_token_usage is not None: + gen_tokens["accumulated_total"] = snapshot.accumulated_token_usage + patch["generationTokens"] = gen_tokens + elif snapshot.accumulated_token_usage is not None: + patch["generationTokens"] = { + "accumulated_total": snapshot.accumulated_token_usage + } eval_latencies = { k: v.duration_ms for k, v in snapshot.scores.items() @@ -1948,22 +2014,22 @@ def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool: """ Check whether the candidate's duration meets the improvement target vs. the baseline. - The baseline is history[0].duration_ms — the very first completed iteration, - representing the original unoptimized configuration's latency. The candidate - must be at least _DURATION_TOLERANCE faster (default: 20% improvement). + The baseline is the duration_ms from the very first iteration appended to history, + captured in ``_baseline_duration_ms``. The candidate must be at least + _DURATION_TOLERANCE faster (default: 20% improvement). - Returns True without blocking when no baseline is available (empty history or - history[0].duration_ms is None), or when the candidate's duration_ms was not - captured. This avoids penalising configurations when timing data is missing. + Returns True without blocking when no baseline is available or when the candidate's + duration_ms was not captured. This avoids penalising configurations when timing data + is missing. :param optimize_context: The completed turn context containing duration_ms :return: True if the duration requirement is met or cannot be checked """ - if not self._history or self._history[0].duration_ms is None: + if self._baseline_duration_ms is None: return True if optimize_context.duration_ms is None: return True - baseline = self._history[0].duration_ms + baseline = self._baseline_duration_ms passed = optimize_context.duration_ms < baseline * _DURATION_TOLERANCE if not passed: logger.warning( @@ -1980,26 +2046,25 @@ def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool: """ Check whether the candidate's estimated cost meets the improvement target vs. the baseline. - The baseline is history[0].estimated_cost_usd — the very first completed iteration, - representing the original unoptimized configuration's cost. The candidate must be - at least _COST_TOLERANCE cheaper (default: 20% improvement). + The baseline is the estimated_cost_usd from the very first iteration appended to + history, captured in ``_baseline_cost_usd``. The candidate must be at least + _COST_TOLERANCE cheaper (default: 10% improvement). The cost value is in USD when model pricing data is available, or raw total token count as a proxy when pricing is absent. Both are comparable relative to their own baselines. - Returns True without blocking when no baseline is available (empty history or - history[0].estimated_cost_usd is None), or when the candidate's cost was not - captured. This avoids penalising configurations when cost data is missing. + Returns True without blocking when no baseline is available or when the candidate's + cost was not captured. This avoids penalising configurations when cost data is missing. :param optimize_context: The completed turn context containing estimated_cost_usd :return: True if the cost requirement is met or cannot be checked """ - if not self._history or self._history[0].estimated_cost_usd is None: + if self._baseline_cost_usd is None: return True if optimize_context.estimated_cost_usd is None: return True - baseline = self._history[0].estimated_cost_usd + baseline = self._baseline_cost_usd passed = optimize_context.estimated_cost_usd < baseline * _COST_TOLERANCE if not passed: logger.warning( @@ -2012,6 +2077,132 @@ def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool: ) return passed + def _all_judges_passing(self) -> bool: + """Return True if every user-configured judge passed in the most recent history entry. + + Inspects the last context in ``_history`` and checks each score key that + corresponds to a judge defined in ``_options.judges`` (skipping synthetic gate + entries whose keys begin with ``_``). Returns False when history is empty or any + judge score does not meet its threshold. + + This is used to decide whether variation generation should preserve the current + behavior and only optimise for cost, rather than trying to improve quality further. + """ + if not self._history or not self._options.judges: + return False + recent = self._history[-1] + if not recent.scores: + return False + for key, judge in self._options.judges.items(): + result = recent.scores.get(key) + if result is None: + return False + threshold = judge.threshold if judge.threshold is not None else 1.0 + if not judge_passed(result.score, threshold, judge.is_inverted): + return False + return True + + def _apply_duration_gate( + self, passed_so_far: bool, ctx: OptimizationContext + ) -> Tuple[bool, OptimizationContext]: + """Apply the latency improvement gate and record its result in ctx.scores. + + When the gate is active (any acceptance statement implies latency optimization), + evaluates whether the candidate's duration improved by at least + _DURATION_TOLERANCE vs the baseline. A synthetic ``_latency_gate`` entry is + added to scores with score=1.0 on pass or score=0.0 on fail so the outcome + is visible in the API result and UI. + + The gate is skipped (no score entry added) when: + - No acceptance statement implies latency optimization. + - ``passed_so_far`` is already False (a prior check failed the sample). + + :param passed_so_far: Whether all prior checks for this sample passed. + :param ctx: Current optimization context. + :return: (passed, updated_ctx) where passed reflects gate outcome. + """ + if not _acceptance_criteria_implies_duration_optimization(self._options.judges): + return passed_so_far, ctx + if not passed_so_far: + return passed_so_far, ctx + passed = self._evaluate_duration(ctx) + if passed: + if self._baseline_duration_ms is not None and ctx.duration_ms is not None: + rationale = ( + f"Latency improvement gate passed: {ctx.duration_ms:.0f}ms is at least " + f"{int((1 - _DURATION_TOLERANCE) * 100)}% faster than baseline " + f"{self._baseline_duration_ms:.0f}ms." + ) + else: + rationale = "Latency gate passed (no baseline)." + score = 1.0 + else: + if self._baseline_duration_ms is not None and ctx.duration_ms is not None: + rationale = ( + f"Latency improvement gate failed: {ctx.duration_ms:.0f}ms did not improve " + f"by {int((1 - _DURATION_TOLERANCE) * 100)}% vs baseline " + f"{self._baseline_duration_ms:.0f}ms " + f"(required < {self._baseline_duration_ms * _DURATION_TOLERANCE:.0f}ms)." + ) + else: + rationale = "Latency gate failed (no baseline data)." + score = 0.0 + ctx = dataclasses.replace( + ctx, + scores={**ctx.scores, "_latency_gate": JudgeResult(score=score, rationale=rationale)}, + ) + return passed, ctx + + def _apply_cost_gate( + self, passed_so_far: bool, ctx: OptimizationContext + ) -> Tuple[bool, OptimizationContext]: + """Apply the cost improvement gate and record its result in ctx.scores. + + When the gate is active (any acceptance statement implies cost optimization), + evaluates whether the candidate's estimated cost improved by at least + _COST_TOLERANCE vs the baseline. A synthetic ``_cost_gate`` entry is + added to scores with score=1.0 on pass or score=0.0 on fail. + + The gate is skipped (no score entry added) when: + - No acceptance statement implies cost optimization. + - ``passed_so_far`` is already False (a prior check failed the sample). + + :param passed_so_far: Whether all prior checks for this sample passed. + :param ctx: Current optimization context. + :return: (passed, updated_ctx) where passed reflects gate outcome. + """ + if not _acceptance_criteria_implies_cost_optimization(self._options.judges): + return passed_so_far, ctx + if not passed_so_far: + return passed_so_far, ctx + passed = self._evaluate_cost(ctx) + if passed: + if self._baseline_cost_usd is not None and ctx.estimated_cost_usd is not None: + rationale = ( + f"Cost improvement gate passed: {ctx.estimated_cost_usd:.6f} is at least " + f"{int((1 - _COST_TOLERANCE) * 100)}% cheaper than baseline " + f"{self._baseline_cost_usd:.6f}." + ) + else: + rationale = "Cost gate passed (no baseline)." + score = 1.0 + else: + if self._baseline_cost_usd is not None and ctx.estimated_cost_usd is not None: + rationale = ( + f"Cost improvement gate failed: {ctx.estimated_cost_usd:.6f} did not improve " + f"by {int((1 - _COST_TOLERANCE) * 100)}% vs baseline " + f"{self._baseline_cost_usd:.6f} " + f"(required < {self._baseline_cost_usd * _COST_TOLERANCE:.6f})." + ) + else: + rationale = "Cost gate failed (no baseline data)." + score = 0.0 + ctx = dataclasses.replace( + ctx, + scores={**ctx.scores, "_cost_gate": JudgeResult(score=score, rationale=rationale)}, + ) + return passed, ctx + def _handle_success( self, optimize_context: OptimizationContext, iteration: int ) -> Any: @@ -2265,15 +2456,12 @@ async def _run_validation_phase( self._safe_status_update("generating", val_ctx, val_iter) val_ctx = await self._execute_agent_turn(val_ctx, val_iter) self._accumulate_tokens(val_ctx) - if self._is_token_limit_exceeded(): - logger.error( - "[Validation %d/%d] -> Token limit exceeded (total=%d)", - i + 1, - validation_count, - self._total_token_usage, - ) - return False, val_ctx + val_ctx = dataclasses.replace( + val_ctx, accumulated_token_usage=self._total_token_usage + ) + # Evaluate pass/fail before the token limit check so a passing validation + # sample is not incorrectly treated as a failure due to budget exhaustion. if options.on_turn is not None: try: sample_passed = options.on_turn(val_ctx) @@ -2285,15 +2473,17 @@ async def _run_validation_phase( else: sample_passed = self._evaluate_response(val_ctx) - if sample_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - sample_passed = self._evaluate_duration(val_ctx) + sample_passed, val_ctx = self._apply_duration_gate(sample_passed, val_ctx) + sample_passed, val_ctx = self._apply_cost_gate(sample_passed, val_ctx) - if sample_passed and _acceptance_criteria_implies_cost_optimization( - self._options.judges - ): - sample_passed = self._evaluate_cost(val_ctx) + if self._is_token_limit_exceeded(): + logger.error( + "[Validation %d/%d] -> Token limit exceeded (total=%d)", + i + 1, + validation_count, + self._total_token_usage, + ) + return False, val_ctx last_ctx = val_ctx @@ -2381,13 +2571,9 @@ async def _run_optimization( optimize_context, iteration ) self._accumulate_tokens(optimize_context) - if self._is_token_limit_exceeded(): - logger.error( - "[Iteration %d] -> Token limit exceeded (total=%d)", - iteration, - self._total_token_usage, - ) - return self._handle_failure(optimize_context, iteration) + optimize_context = dataclasses.replace( + optimize_context, accumulated_token_usage=self._total_token_usage + ) # Manual path: on_turn callback gives caller full control over pass/fail if self._options.on_turn is not None: @@ -2414,15 +2600,18 @@ async def _run_optimization( iteration, ) - if initial_passed and _acceptance_criteria_implies_duration_optimization( - self._options.judges - ): - initial_passed = self._evaluate_duration(optimize_context) + initial_passed, optimize_context = self._apply_duration_gate(initial_passed, optimize_context) + initial_passed, optimize_context = self._apply_cost_gate(initial_passed, optimize_context) - if initial_passed and _acceptance_criteria_implies_cost_optimization( - self._options.judges - ): - initial_passed = self._evaluate_cost(optimize_context) + # Token limit check after pass/fail evaluation so the persisted record + # correctly reflects whether the iteration passed before stopping the run. + if self._is_token_limit_exceeded(): + logger.error( + "[Iteration %d] -> Token limit exceeded (total=%d)", + iteration, + self._total_token_usage, + ) + return self._handle_failure(optimize_context, iteration) if initial_passed: all_valid, last_ctx = await self._run_validation_phase( @@ -2445,7 +2634,9 @@ async def _run_optimization( ) if iteration >= self._options.max_attempts: return self._handle_failure(optimize_context, iteration) + self._record_baseline(last_ctx) self._history.append(last_ctx) + self._history = _trim_history(self._history, _MAX_STANDARD_HISTORY_LENGTH) try: await self._generate_new_variation( iteration, last_ctx.current_variables @@ -2475,7 +2666,9 @@ async def _run_optimization( ) if iteration >= self._options.max_attempts: return self._handle_failure(optimize_context, iteration) + self._record_baseline(optimize_context) self._history.append(optimize_context) + self._history = _trim_history(self._history, _MAX_STANDARD_HISTORY_LENGTH) try: await self._generate_new_variation( iteration, optimize_context.current_variables diff --git a/packages/optimization/src/ldai_optimizer/dataclasses.py b/packages/optimization/src/ldai_optimizer/dataclasses.py index 1f5e28c..2a9f454 100644 --- a/packages/optimization/src/ldai_optimizer/dataclasses.py +++ b/packages/optimization/src/ldai_optimizer/dataclasses.py @@ -218,6 +218,7 @@ class OptimizationContext: duration_ms: Optional[float] = None # wall-clock time for the agent call in milliseconds usage: Optional[TokenUsage] = None # token usage reported by the agent for this iteration estimated_cost_usd: Optional[float] = None # estimated cost; USD when pricing available, else total tokens + accumulated_token_usage: Optional[int] = None # single running total across ALL calls in this run (generation + judges + variation) def copy_without_history(self) -> OptimizationContext: """ @@ -238,6 +239,7 @@ def copy_without_history(self) -> OptimizationContext: duration_ms=self.duration_ms, usage=self.usage, estimated_cost_usd=self.estimated_cost_usd, + accumulated_token_usage=self.accumulated_token_usage, ) def to_json(self) -> Dict[str, Any]: @@ -264,6 +266,7 @@ def to_json(self) -> Dict[str, Any]: "iteration": self.iteration, "duration_ms": self.duration_ms, "estimated_cost_usd": self.estimated_cost_usd, + "accumulated_token_usage": self.accumulated_token_usage, } if self.usage is not None: result["usage"] = { diff --git a/packages/optimization/src/ldai_optimizer/ld_api_client.py b/packages/optimization/src/ldai_optimizer/ld_api_client.py index 3efa725..37f6549 100644 --- a/packages/optimization/src/ldai_optimizer/ld_api_client.py +++ b/packages/optimization/src/ldai_optimizer/ld_api_client.py @@ -118,7 +118,7 @@ class AgentOptimizationResultPatch(TypedDict, total=False): completionResponse: str scores: Dict[str, Any] generationLatency: int - generationTokens: Dict[str, int] + generationTokens: Dict[str, Any] evaluationLatencies: Dict[str, float] evaluationTokens: Dict[str, Dict[str, int]] variation: Dict[str, Any] diff --git a/packages/optimization/src/ldai_optimizer/prompts.py b/packages/optimization/src/ldai_optimizer/prompts.py index eadf35d..0cc8a53 100644 --- a/packages/optimization/src/ldai_optimizer/prompts.py +++ b/packages/optimization/src/ldai_optimizer/prompts.py @@ -144,6 +144,7 @@ def build_new_variation_prompt( initial_instructions: str, optimize_for_duration: bool = False, optimize_for_cost: bool = False, + quality_already_passing: bool = False, ) -> str: """ Build the LLM prompt for generating an improved agent configuration. @@ -165,6 +166,9 @@ def build_new_variation_prompt( instructing the LLM to prefer faster models and simpler instructions. :param optimize_for_cost: When True, appends a cost optimization section instructing the LLM to prefer cheaper models and reduce token usage. + :param quality_already_passing: When True, signals that all judge criteria are + currently passing and the cost optimization section should instruct the LLM + to preserve existing behavior while only reducing cost. :return: The assembled prompt string """ sections = [ @@ -179,7 +183,7 @@ def build_new_variation_prompt( history, model_choices, variable_choices, initial_instructions ), variation_prompt_duration_optimization(model_choices) if optimize_for_duration else "", - variation_prompt_cost_optimization(model_choices) if optimize_for_cost else "", + variation_prompt_cost_optimization(model_choices, quality_already_passing=quality_already_passing) if optimize_for_cost else "", ] return "\n\n".join(s for s in sections if s) @@ -595,7 +599,10 @@ def variation_prompt_duration_optimization(model_choices: List[str]) -> str: ) -def variation_prompt_cost_optimization(model_choices: List[str]) -> str: +def variation_prompt_cost_optimization( + model_choices: List[str], + quality_already_passing: bool = False, +) -> str: """ Cost optimization section of the variation prompt. @@ -604,38 +611,62 @@ def variation_prompt_cost_optimization(model_choices: List[str]) -> str: must still be met first — and provides concrete guidance on how to reduce cost through model selection and instruction simplification. + When ``quality_already_passing`` is True, the framing shifts: since all + judge criteria are already satisfied, the LLM is instructed to preserve + the existing behavior exactly and only apply changes that reduce cost + without affecting output quality. + :param model_choices: List of model IDs the LLM may select from, so it can apply its own knowledge of which models tend to be cheaper. + :param quality_already_passing: When True, signals that all judge criteria + are currently passing. The section will direct the LLM to preserve + output quality and focus exclusively on cost reduction strategies. :return: The cost optimization prompt block. """ - return "\n".join( - [ + if quality_already_passing: + intent_lines = [ "## Cost Optimization:", "The acceptance criteria for this optimization implies that token usage / cost should be reduced.", - "In addition to improving quality, generate a variation that aims to reduce the agent's cost.", - "Cost is driven by two factors: (1) the number of tokens processed, and (2) the per-token price of the model.", - "Target both factors with the strategies below.", + "*** IMPORTANT: All quality acceptance criteria are currently passing. ***", + "The goal of this variation is to reduce cost WITHOUT changing the behavior or quality of the agent's responses.", + "Do NOT alter the instructions in ways that would change what the agent says or how it reasons.", + "Only apply changes that reduce token usage or switch to a cheaper model while preserving the same output quality.", + "If you cannot reduce cost without risking quality, keep the instructions unchanged and only consider a cheaper model.", "", - "### Reducing token usage (input tokens):", - "- Remove redundant, verbose, or repeated phrasing from the instructions.", - "- Collapse multi-sentence explanations into a single concise directive.", - "- Remove examples or few-shot demonstrations unless they are essential for accuracy.", - "- Eliminate instructional scaffolding that the model does not need (e.g. 'You are a helpful assistant that...').", - "- Use bullet points instead of prose where possible — they are more token-efficient.", - "", - "### Reducing token usage (output tokens):", - "- Instruct the agent to be concise and avoid unnecessary elaboration.", - "- Specify the exact format and length of the expected response (e.g. 'Respond in one sentence.').", - "- Set or reduce max_tokens if the current value allows longer responses than needed.", - "- Avoid instructions that encourage the agent to 'explain its reasoning' unless required by the acceptance criteria.", - "", - "### Reducing per-token cost via model selection:", - "- Consider switching to a cheaper model from the available choices if quality requirements can still be met.", - f" Available models: {model_choices}", - " Use your knowledge of relative model pricing to prefer lower-cost options.", - " Only switch models if the cheaper model is capable of satisfying the acceptance criteria.", + ] + else: + intent_lines = [ + "## Cost Optimization:", + "The acceptance criteria for this optimization implies that token usage / cost should be reduced.", + "In addition to improving quality, generate a variation that aims to reduce the agent's cost.", "", - "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower cost.", - "Apply cost-reduction changes incrementally: prefer the smallest change that measurably reduces cost.", ] - ) + + shared_lines = [ + "Cost is driven by two factors: (1) the number of tokens processed, and (2) the per-token price of the model.", + "Target both factors with the strategies below.", + "", + "### Reducing token usage (input tokens):", + "- Remove redundant, verbose, or repeated phrasing from the instructions.", + "- Collapse multi-sentence explanations into a single concise directive.", + "- Remove examples or few-shot demonstrations unless they are essential for accuracy.", + "- Eliminate instructional scaffolding that the model does not need (e.g. 'You are a helpful assistant that...').", + "- Use bullet points instead of prose where possible — they are more token-efficient.", + "", + "### Reducing token usage (output tokens):", + "- Instruct the agent to be concise and avoid unnecessary elaboration.", + "- Specify the exact format and length of the expected response (e.g. 'Respond in one sentence.').", + "- Set or reduce max_tokens if the current value allows longer responses than needed.", + "- Avoid instructions that encourage the agent to 'explain its reasoning' unless required by the acceptance criteria.", + "", + "### Reducing per-token cost via model selection:", + "- Consider switching to a cheaper model from the available choices if quality requirements can still be met.", + f" Available models: {model_choices}", + " Use your knowledge of relative model pricing to prefer lower-cost options.", + " Only switch models if the cheaper model is capable of satisfying the acceptance criteria.", + "", + "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower cost.", + "Apply cost-reduction changes incrementally: prefer the smallest change that measurably reduces cost.", + ] + + return "\n".join(intent_lines + shared_lines) diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 82e10d2..8f30c24 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -10,7 +10,13 @@ from ldai.tracker import TokenUsage from ldclient import Context -from ldai_optimizer.client import OptimizationClient, _compute_validation_count, _find_model_config +from ldai_optimizer.client import ( + OptimizationClient, + _MAX_STANDARD_HISTORY_LENGTH, + _compute_validation_count, + _find_model_config, + _trim_history, +) from ldai_optimizer.util import judge_passed from ldai_optimizer.dataclasses import ( AIJudgeCallConfig, @@ -538,7 +544,7 @@ async def test_duration_context_added_to_instructions_when_latency_keyword_prese assert "state the duration" in config.instructions async def test_duration_context_includes_baseline_comparison_when_history_present(self): - """When history[0] has a duration, the judge instructions include a baseline comparison.""" + """When a baseline duration is captured, the judge instructions include a baseline comparison.""" self.client._history = [ OptimizationContext( scores={}, @@ -550,6 +556,7 @@ async def test_duration_context_includes_baseline_comparison_when_history_presen duration_ms=2000.0, ) ] + self.client._baseline_duration_ms = 2000.0 judge = OptimizationJudge( threshold=0.8, acceptance_statement="Responses should have low latency.", @@ -581,6 +588,7 @@ async def test_duration_context_says_slower_when_candidate_is_slower(self): duration_ms=1000.0, ) ] + self.client._baseline_duration_ms = 1000.0 judge = OptimizationJudge( threshold=0.8, acceptance_statement="The response must be fast.", @@ -2686,7 +2694,12 @@ async def test_gt_stops_when_token_limit_exceeded_on_first_sample(self): assert len(results) == 1 async def test_gt_stops_mid_batch_when_limit_exceeded_on_second_sample(self): - """Token limit exceeded on the second of two samples stops after that sample.""" + """Token limit exceeded on the final sample of a batch — all passed — marks run successful. + + With 2 samples and the budget exhausted after sample 2 (the last one), and + both samples passing their judges, the run should be marked succeeded rather + than failed, because the token limit was hit on the final successful iteration. + """ agent_responses = [ OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), @@ -2699,13 +2712,14 @@ async def test_gt_stops_mid_batch_when_limit_exceeded_on_second_sample(self): opts = _make_gt_options( handle_agent_call=handle_agent_call, handle_judge_call=handle_judge_call, - token_limit=250, # 100 + 200 = 300 ≥ 250 → trip on second sample + token_limit=250, # 100 + 200 = 300 ≥ 250 → trip on second (final) sample max_attempts=5, ) results = await client.optimize_from_ground_truth_options("test-agent", opts) - assert client._last_run_succeeded is False + # All samples passed; token limit hit only after the final sample → success + assert client._last_run_succeeded is True assert handle_agent_call.call_count == 2 - # Both samples processed so far are in the results + # Both samples are in the results assert len(results) == 2 async def test_gt_on_failing_result_called_on_token_limit(self): @@ -2812,6 +2826,300 @@ async def test_total_token_usage_resets_between_runs(self): assert client._last_run_succeeded is True +# --------------------------------------------------------------------------- +# History truncation +# --------------------------------------------------------------------------- + + +class TestHistoryTruncation: + """Tests that _trim_history and the optimizer correctly cap _history growth.""" + + def test_trim_history_no_op_when_under_limit(self): + ctx = OptimizationContext( + scores={}, completion_response="r", current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=1, + ) + history = [ctx, ctx, ctx] + assert _trim_history(history, 5) is history + + def test_trim_history_keeps_most_recent_items(self): + """_trim_history is a pure tail slice — it keeps the most recent max_len items.""" + ctxs = [ + OptimizationContext( + scores={}, completion_response=str(i), current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=i, + ) + for i in range(10) + ] + result = _trim_history(ctxs, 4) + assert len(result) == 4 + # The four MOST RECENT items are retained; the oldest are discarded. + assert result[0] is ctxs[6] + assert result[-1] is ctxs[-1] + + def test_trim_history_max_len_1(self): + ctxs = [ + OptimizationContext( + scores={}, completion_response=str(i), current_instructions="i", + current_parameters={}, current_variables={}, current_model="m", iteration=i, + ) + for i in range(5) + ] + result = _trim_history(ctxs, 1) + assert len(result) == 1 + assert result[0] is ctxs[-1] + + @pytest.mark.asyncio + async def test_gt_history_trimmed_to_n_after_each_attempt(self): + """After each GT attempt _history must not exceed n (sample count).""" + mock_ldai = _make_ldai_client() + client = _make_client(mock_ldai) + + # Three attempts, 2 samples each (n=2); judge fails on every attempt so + # we cycle through all max_attempts and history never exceeds 2 items. + opts = _make_gt_options( + handle_judge_call=AsyncMock(return_value=OptimizationResponse(output=JUDGE_FAIL_RESPONSE)), + max_attempts=3, + ) + + history_lengths: list[int] = [] + original_extend = list.extend + + def spy_extend(self_list, iterable): + original_extend(self_list, iterable) + + with patch.object(client, "_generate_new_variation", new_callable=AsyncMock): + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + # _history should be capped at n=2 (the number of samples) + assert len(client._history) <= 2 + + @pytest.mark.asyncio + async def test_standard_history_trimmed_to_max_standard_length(self): + """After each standard optimizer iteration _history must not exceed _MAX_STANDARD_HISTORY_LENGTH.""" + mock_ldai = _make_ldai_client() + client = _make_client(mock_ldai) + + # Enough failing attempts to push history beyond the cap. + attempts = _MAX_STANDARD_HISTORY_LENGTH + 3 + opts = _make_options( + handle_judge_call=AsyncMock(return_value=OptimizationResponse(output=JUDGE_FAIL_RESPONSE)), + max_attempts=attempts, + ) + + with patch.object(client, "_generate_new_variation", new_callable=AsyncMock): + await client.optimize_from_options("test-agent", opts) + + assert len(client._history) <= _MAX_STANDARD_HISTORY_LENGTH + + +# --------------------------------------------------------------------------- +# Accumulated token usage +# --------------------------------------------------------------------------- + + +class TestAccumulatedTokenUsage: + """Tests that total token usage is tracked across agent, judge, and variation calls.""" + + def setup_method(self): + self.mock_ldai = _make_ldai_client() + + @pytest.mark.asyncio + async def test_variation_tokens_counted_in_total(self): + """Tokens consumed by variation generation are rolled into _total_token_usage.""" + agent_usage = TokenUsage(total=100, input=60, output=40) + variation_usage = TokenUsage(total=50, input=30, output=20) + + # Build a minimal GT run (no validation phase) so we can control exactly + # which handle_agent_call responses map to agent turns vs variation. + # GT flow for one failed attempt then success: + # Attempt 1: 2 samples → 2 agent calls → judge fails on sample 1 → all_passed=False + # Variation: 1 handle_agent_call with variation_usage + # Attempt 2: 2 samples → 2 agent calls → all pass → success + agent_responses = [ + OptimizationResponse(output="Answer 1.", usage=agent_usage), # attempt 1, sample 1 + OptimizationResponse(output="Answer 2.", usage=agent_usage), # attempt 1, sample 2 + OptimizationResponse(output=VARIATION_RESPONSE, usage=variation_usage), # variation + OptimizationResponse(output="Answer 1.", usage=agent_usage), # attempt 2, sample 1 + OptimizationResponse(output="Answer 2.", usage=agent_usage), # attempt 2, sample 2 + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + judge_responses = [ + OptimizationResponse(output=JUDGE_FAIL_RESPONSE), # attempt 1, sample 1 — fail + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 1, sample 2 — pass + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 2, sample 1 — pass + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # attempt 2, sample 2 — pass + ] + handle_judge_call = AsyncMock(side_effect=judge_responses) + + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + max_attempts=3, + ) + await client.optimize_from_ground_truth_options("test-agent", opts) + + # _total_token_usage must include tokens from all 4 agent calls + 1 variation call + expected_min = agent_usage.total * 4 + variation_usage.total + assert client._total_token_usage >= expected_min + + @pytest.mark.asyncio + async def test_accumulated_token_usage_stamped_on_context(self): + """accumulated_token_usage on the returned context equals _total_token_usage.""" + agent_usage = TokenUsage(total=200, input=120, output=80) + handle_agent_call = AsyncMock( + return_value=OptimizationResponse(output="The answer is 4.", usage=agent_usage) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + + client = _make_client(self.mock_ldai) + opts = _make_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + ) + results = await client.optimize_from_options("test-agent", opts) + + assert results is not None + last_ctx = results if isinstance(results, OptimizationContext) else None + # Check the internal total includes at least the agent tokens + assert client._total_token_usage >= agent_usage.total + + @pytest.mark.asyncio + async def test_gt_accumulated_token_usage_on_final_success(self): + """On a passing GT run the last result's accumulated_token_usage == _total_token_usage.""" + agent_usage = TokenUsage(total=100, input=60, output=40) + handle_agent_call = AsyncMock( + return_value=OptimizationResponse(output="correct", usage=agent_usage) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert isinstance(results, list) and len(results) > 0 + final_ctx = results[-1] + assert final_ctx.accumulated_token_usage is not None + assert final_ctx.accumulated_token_usage == client._total_token_usage + + +# --------------------------------------------------------------------------- +# GT token budget exhausted on final successful iteration +# --------------------------------------------------------------------------- + + +class TestGTTokenBudgetOnFinalIteration: + """Token budget exhausted on the very last GT sample should mark the run successful + when all samples passed, and failed when mid-batch or a sample failed.""" + + def setup_method(self): + self.mock_ldai = _make_ldai_client() + + @pytest.mark.asyncio + async def test_token_limit_on_last_sample_all_passed_marks_success(self): + """Exhausting the budget on the final sample of a batch where all passed → success.""" + agent_responses = [ + OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), + OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, # 100+200=300 ≥ 250 → budget hit on final (2nd) sample + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is True + assert len(results) == 2 + + @pytest.mark.asyncio + async def test_token_limit_on_last_sample_one_failed_marks_failure(self): + """Budget hit on the final sample but a prior sample failed → failure.""" + agent_responses = [ + # Sample 1 fails judge + OptimizationResponse(output="Answer 1.", usage=TokenUsage(total=100, input=60, output=40)), + # Sample 2 passes judge but pushes over limit + OptimizationResponse(output="Answer 2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + judge_responses = [ + OptimizationResponse(output=JUDGE_FAIL_RESPONSE), # sample 1 fails + OptimizationResponse(output=JUDGE_PASS_RESPONSE), # sample 2 passes + ] + handle_judge_call = AsyncMock(side_effect=judge_responses) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is False + + @pytest.mark.asyncio + async def test_token_limit_mid_batch_marks_failure(self): + """Budget hit mid-batch (not on the final sample) → always failure.""" + handle_agent_call = AsyncMock( + return_value=OptimizationResponse( + output="Answer.", usage=TokenUsage(total=600, input=400, output=200) + ) + ) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=500, # 600 > 500 → trip on first of two samples + max_attempts=5, + ) + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert client._last_run_succeeded is False + assert len(results) == 1 + + @pytest.mark.asyncio + async def test_on_passing_result_called_on_budget_exhaustion_success(self): + """on_passing_result fires when token budget exhausted but all GT samples passed.""" + on_passing = MagicMock() + agent_responses = [ + OptimizationResponse(output="A1.", usage=TokenUsage(total=100, input=60, output=40)), + OptimizationResponse(output="A2.", usage=TokenUsage(total=200, input=120, output=80)), + ] + handle_agent_call = AsyncMock(side_effect=agent_responses) + handle_judge_call = AsyncMock( + return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE) + ) + client = _make_client(self.mock_ldai) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + handle_judge_call=handle_judge_call, + token_limit=250, + max_attempts=5, + on_passing_result=on_passing, + ) + await client.optimize_from_ground_truth_options("test-agent", opts) + + on_passing.assert_called_once() + + # --------------------------------------------------------------------------- # optimize_from_config # --------------------------------------------------------------------------- @@ -3518,43 +3826,43 @@ def test_returns_true_when_baseline_duration_is_none(self): assert self.client._evaluate_duration(self._ctx(5000, iteration=2)) is True def test_returns_true_when_candidate_duration_is_none(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(None, iteration=2)) is True def test_passes_when_candidate_is_more_than_20_percent_faster(self): # baseline=2000ms, threshold=1600ms, candidate=1500ms → 1500 < 1600 → pass - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1500, iteration=2)) is True def test_fails_when_candidate_is_exactly_at_threshold(self): # baseline=2000ms, threshold=1600ms, candidate=1600ms → not strictly less → fail - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1600, iteration=2)) is False def test_fails_when_improvement_is_less_than_20_percent(self): # baseline=2000ms, threshold=1600ms, candidate=1800ms → 1800 >= 1600 → fail - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(1800, iteration=2)) is False def test_fails_when_candidate_matches_baseline(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(2000, iteration=2)) is False def test_fails_when_candidate_is_slower_than_baseline(self): - self.client._history = [self._ctx(2000, iteration=1)] + self.client._baseline_duration_ms = 2000.0 assert self.client._evaluate_duration(self._ctx(2500, iteration=2)) is False - def test_uses_history_index_zero_as_baseline_not_last(self): - # history[0] is 2000ms (baseline), history[-1] is 500ms (fast, but not the baseline) - first = self._ctx(2000, iteration=1) - later = self._ctx(500, iteration=2) - self.client._history = [first, later] - # candidate=1500ms < 2000 * 0.80 = 1600ms → pass (uses history[0], not history[-1]) + def test_uses_explicit_baseline_not_most_recent_history(self): + # _baseline_duration_ms is 2000ms even though the most recent history item has 500ms. + # The explicit baseline is what must be used, not any history item. + self.client._baseline_duration_ms = 2000.0 + self.client._history = [self._ctx(2000, iteration=1), self._ctx(500, iteration=2)] + # candidate=1500ms < 2000 * 0.80 = 1600ms → pass (uses explicit baseline, not history[-1]) assert self.client._evaluate_duration(self._ctx(1500, iteration=3)) is True def test_pass_boundary_just_below_threshold(self): # baseline=1000ms, threshold=800ms, candidate=799ms → pass - self.client._history = [self._ctx(1000, iteration=1)] + self.client._baseline_duration_ms = 1000.0 assert self.client._evaluate_duration(self._ctx(799, iteration=2)) is True @@ -3665,8 +3973,8 @@ async def test_no_duration_gate_when_acceptance_criteria_has_no_latency_keywords with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: mock_execute.side_effect = execute_side_effects - # Manually seed history so _evaluate_duration would fire if incorrectly triggered - client._history = [self._ctx_with(duration_ms=2000, iteration=0)] + # Manually seed baseline so _evaluate_duration would fire if incorrectly triggered + client._baseline_duration_ms = 2000.0 result = await client.optimize_from_options("test-agent", opts) assert result is not None @@ -4831,6 +5139,7 @@ def _ctx(self, cost: float, iteration: int = 2) -> OptimizationContext: def _seed_history(self, baseline_cost: float): self.client._history = [self._ctx(baseline_cost, iteration=1)] + self.client._baseline_cost_usd = baseline_cost def test_passes_when_cost_improved_beyond_tolerance(self): self._seed_history(0.010) @@ -4838,13 +5147,14 @@ def test_passes_when_cost_improved_beyond_tolerance(self): def test_fails_when_cost_not_improved_enough(self): self._seed_history(0.010) - assert self.client._evaluate_cost(self._ctx(0.009)) is False + assert self.client._evaluate_cost(self._ctx(0.0095)) is False def test_passes_at_exact_tolerance_boundary(self): self._seed_history(0.010) - # 0.010 * 0.80 = 0.008; must be strictly less than 0.008 - assert self.client._evaluate_cost(self._ctx(0.0079)) is True - assert self.client._evaluate_cost(self._ctx(0.008)) is False + # 0.010 * 0.90 ≈ 0.009; must be strictly less than the threshold + assert self.client._evaluate_cost(self._ctx(0.0089)) is True + # 0.0091 is above the threshold → fail + assert self.client._evaluate_cost(self._ctx(0.0091)) is False def test_skips_gracefully_when_history_empty(self): self.client._history = [] @@ -4866,6 +5176,233 @@ def test_skips_gracefully_when_units_differ_across_model_switch(self): assert self.client._evaluate_cost(self._ctx(None)) is True +# --------------------------------------------------------------------------- +# _apply_duration_gate +# --------------------------------------------------------------------------- + + +class TestApplyDurationGate: + """Unit tests for the _apply_duration_gate wrapper method.""" + + def _make_judges_with_latency(self): + return { + "latency": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be faster and reduce latency.", + ) + } + + def _make_judges_no_latency(self): + return { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ) + } + + def _ctx(self, duration_ms=None, iteration=2): + return OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + duration_ms=duration_ms, + ) + + def setup_method(self): + self.client = _make_client() + self.client._options = _make_options(judges=self._make_judges_with_latency()) + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._baseline_duration_ms = 2000.0 + + def test_no_entry_added_when_gate_not_active(self): + self.client._options = _make_options(judges=self._make_judges_no_latency()) + ctx = self._ctx(1000) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is True + assert "_latency_gate" not in updated.scores + + def test_no_entry_added_when_already_failed(self): + ctx = self._ctx(1000) + passed, updated = self.client._apply_duration_gate(False, ctx) + assert passed is False + assert "_latency_gate" not in updated.scores + + def test_gate_pass_adds_score_1(self): + # baseline=2000ms, threshold=1600ms, candidate=1500ms → pass + ctx = self._ctx(1500) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is True + assert "_latency_gate" in updated.scores + assert updated.scores["_latency_gate"].score == 1.0 + assert "passed" in updated.scores["_latency_gate"].rationale.lower() + assert "1500" in updated.scores["_latency_gate"].rationale + assert "2000" in updated.scores["_latency_gate"].rationale + + def test_gate_fail_adds_score_0(self): + # baseline=2000ms, threshold=1600ms, candidate=1800ms → fail + ctx = self._ctx(1800) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert passed is False + assert "_latency_gate" in updated.scores + assert updated.scores["_latency_gate"].score == 0.0 + assert "failed" in updated.scores["_latency_gate"].rationale.lower() + assert "1800" in updated.scores["_latency_gate"].rationale + + def test_gate_pass_no_baseline_gives_fallback_rationale(self): + self.client._baseline_duration_ms = None + ctx = self._ctx(None) + passed, updated = self.client._apply_duration_gate(True, ctx) + assert "_latency_gate" in updated.scores + assert "no baseline" in updated.scores["_latency_gate"].rationale.lower() + + def test_existing_scores_are_preserved(self): + ctx = OptimizationContext( + scores={"accuracy": JudgeResult(score=1.0, rationale="ok")}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + duration_ms=1500, + ) + _, updated = self.client._apply_duration_gate(True, ctx) + assert "accuracy" in updated.scores + assert "_latency_gate" in updated.scores + + def test_no_threshold_field_on_judge_result(self): + ctx = self._ctx(1500) + _, updated = self.client._apply_duration_gate(True, ctx) + gate_result = updated.scores["_latency_gate"] + assert not hasattr(gate_result, "threshold") or gate_result.threshold is None # type: ignore[union-attr] + + +# --------------------------------------------------------------------------- +# _apply_cost_gate +# --------------------------------------------------------------------------- + + +class TestApplyCostGate: + """Unit tests for the _apply_cost_gate wrapper method.""" + + def _make_judges_with_cost(self): + return { + "cost": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be cheaper and reduce cost.", + ) + } + + def _make_judges_no_cost(self): + return { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ) + } + + def _ctx(self, cost=None, iteration=2): + return OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def setup_method(self): + self.client = _make_client() + self.client._options = _make_options(judges=self._make_judges_with_cost()) + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._baseline_cost_usd = 0.010 + + def test_no_entry_added_when_gate_not_active(self): + self.client._options = _make_options(judges=self._make_judges_no_cost()) + ctx = self._ctx(0.005) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is True + assert "_cost_gate" not in updated.scores + + def test_no_entry_added_when_already_failed(self): + ctx = self._ctx(0.005) + passed, updated = self.client._apply_cost_gate(False, ctx) + assert passed is False + assert "_cost_gate" not in updated.scores + + def test_gate_pass_adds_score_1(self): + # baseline=0.010, threshold=0.009, candidate=0.007 → pass + ctx = self._ctx(0.007) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is True + assert "_cost_gate" in updated.scores + assert updated.scores["_cost_gate"].score == 1.0 + assert "passed" in updated.scores["_cost_gate"].rationale.lower() + + def test_gate_fail_adds_score_0(self): + # baseline=0.010, threshold=0.009, candidate=0.0095 → fail + ctx = self._ctx(0.0095) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert passed is False + assert "_cost_gate" in updated.scores + assert updated.scores["_cost_gate"].score == 0.0 + assert "failed" in updated.scores["_cost_gate"].rationale.lower() + + def test_gate_pass_no_baseline_gives_fallback_rationale(self): + self.client._baseline_cost_usd = None + ctx = self._ctx(None) + passed, updated = self.client._apply_cost_gate(True, ctx) + assert "_cost_gate" in updated.scores + assert "no baseline" in updated.scores["_cost_gate"].rationale.lower() + + def test_existing_scores_are_preserved(self): + ctx = OptimizationContext( + scores={"accuracy": JudgeResult(score=1.0, rationale="ok")}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + estimated_cost_usd=0.007, + ) + _, updated = self.client._apply_cost_gate(True, ctx) + assert "accuracy" in updated.scores + assert "_cost_gate" in updated.scores + + def test_both_gates_active_compose_cleanly(self): + """Duration + cost gate can both fire on the same context.""" + self.client._options = _make_options( + judges={ + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be faster, reduce latency, and cheaper cost.", + ) + } + ) + self.client._baseline_duration_ms = 2000.0 + self.client._baseline_cost_usd = 0.010 + ctx = OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=2, + duration_ms=1500, + estimated_cost_usd=0.007, + ) + passed, ctx = self.client._apply_duration_gate(True, ctx) + passed, ctx = self.client._apply_cost_gate(passed, ctx) + assert passed is True + assert "_latency_gate" in ctx.scores + assert "_cost_gate" in ctx.scores + assert ctx.scores["_latency_gate"].score == 1.0 + assert ctx.scores["_cost_gate"].score == 1.0 + + # --------------------------------------------------------------------------- # variation_prompt_cost_optimization # --------------------------------------------------------------------------- @@ -4888,6 +5425,148 @@ def test_mentions_token_reduction(self): result = variation_prompt_cost_optimization(["gpt-4o"]) assert "token" in result.lower() + # quality_already_passing=False (default) — standard framing + def test_default_framing_says_improve_quality_too(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=False) + assert "In addition to improving quality" in result + + def test_default_framing_does_not_mention_quality_passing(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=False) + assert "currently passing" not in result + + # quality_already_passing=True — preserve-behavior framing + def test_passing_framing_says_criteria_already_passing(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "currently passing" in result + + def test_passing_framing_says_do_not_change_behavior(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "preserve" in result.lower() or "do not" in result.lower() or "NOT" in result + + def test_passing_framing_still_includes_token_guidance(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "token" in result.lower() + + def test_passing_framing_still_includes_model_choices(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"], quality_already_passing=True) + assert "gpt-4o" in result + + def test_passing_framing_still_warns_quality_is_primary(self): + result = variation_prompt_cost_optimization(["gpt-4o"], quality_already_passing=True) + assert "primary objective" in result.lower() or "do not sacrifice" in result.lower() + + +# --------------------------------------------------------------------------- +# _all_judges_passing +# --------------------------------------------------------------------------- + + +class TestAllJudgesPassing: + def _ctx_with_scores(self, scores, iteration=1): + return OptimizationContext( + scores=scores, + completion_response="ok", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + ) + + def setup_method(self): + self.client = _make_client() + self.client._initialize_class_members_from_config(_make_agent_config()) + + def test_returns_false_when_history_empty(self): + self.client._history = [] + self.client._options = _make_options() + assert self.client._all_judges_passing() is False + + def test_returns_false_when_no_judges(self): + self.client._options = _make_options(judges={}) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=1.0, rationale="ok")})] + assert self.client._all_judges_passing() is False + + def test_returns_false_when_judge_score_missing(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({})] + assert self.client._all_judges_passing() is False + + def test_returns_true_when_all_judges_pass(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=0.9, rationale="ok")})] + assert self.client._all_judges_passing() is True + + def test_returns_false_when_one_judge_below_threshold(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({"accuracy": JudgeResult(score=0.7, rationale="bad")})] + assert self.client._all_judges_passing() is False + + def test_returns_false_when_one_of_multiple_judges_fails(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + "tone": OptimizationJudge(threshold=0.9, acceptance_statement="friendly tone"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "tone": JudgeResult(score=0.5, rationale="failed"), + })] + assert self.client._all_judges_passing() is False + + def test_returns_true_when_all_of_multiple_judges_pass(self): + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + "tone": OptimizationJudge(threshold=0.9, acceptance_statement="friendly tone"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "tone": JudgeResult(score=0.95, rationale="ok"), + })] + assert self.client._all_judges_passing() is True + + def test_gate_scores_do_not_affect_result(self): + """Synthetic _latency_gate / _cost_gate keys must not count as judge failures.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [self._ctx_with_scores({ + "accuracy": JudgeResult(score=1.0, rationale="ok"), + "_latency_gate": JudgeResult(score=0.0, rationale="gate failed"), + "_cost_gate": JudgeResult(score=0.0, rationale="gate failed"), + })] + # Gate failures should not prevent _all_judges_passing from returning True + assert self.client._all_judges_passing() is True + + def test_uses_most_recent_history_entry(self): + """Only the last history entry is inspected.""" + self.client._options = _make_options(judges={ + "accuracy": OptimizationJudge(threshold=0.8, acceptance_statement="accurate"), + }) + self.client._history = [ + self._ctx_with_scores({"accuracy": JudgeResult(score=0.5, rationale="early fail")}, iteration=1), + self._ctx_with_scores({"accuracy": JudgeResult(score=1.0, rationale="later pass")}, iteration=2), + ] + assert self.client._all_judges_passing() is True + + def test_inverted_judge_passes_when_score_below_threshold(self): + self.client._options = _make_options(judges={ + "toxicity": OptimizationJudge(threshold=0.2, acceptance_statement="low toxicity", is_inverted=True), + }) + self.client._history = [self._ctx_with_scores({"toxicity": JudgeResult(score=0.1, rationale="clean")})] + assert self.client._all_judges_passing() is True + + def test_inverted_judge_fails_when_score_above_threshold(self): + self.client._options = _make_options(judges={ + "toxicity": OptimizationJudge(threshold=0.2, acceptance_statement="low toxicity", is_inverted=True), + }) + self.client._history = [self._ctx_with_scores({"toxicity": JudgeResult(score=0.5, rationale="toxic")})] + assert self.client._all_judges_passing() is False + class TestBuildNewVariationPromptCost: def _make_history(self) -> list: @@ -5064,6 +5743,7 @@ async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): estimated_cost_usd=500.0, ) self.client._history = [baseline_ctx] + self.client._baseline_cost_usd = 500.0 self.client._options = _make_options(handle_judge_call=_capture_judge_call) await self.client._evaluate_acceptance_judge( judge_key="cost-judge",