Conversation
…tend-refactor/migrate-apps-to-workflows
…fact queries Previously, querying folders with parent_id=None or artifacts with folder_id=None would not filter for root-level items. Now uses model_fields_set to detect when the field was explicitly set to None vs not provided, allowing proper filtering for items without a parent folder.
…variants When a variant is archived, all its child revisions are now also soft-deleted so they no longer appear in revision queries that filter on deleted_at. Similarly, unarchiving a variant restores all its child revisions.
Allows filtering environment revisions by application references. The filter returns only revisions where the specified application's deployment changed compared to the previous revision, enabling efficient querying of deployment history for a specific application.
The flags field was missing from RunUpdate calls in evaluate_batch_testset, evaluate_batch_invocation, and _evaluate_batch_items, causing flags to be lost when updating run status.
Updated references from legacyAppRevision to workflow entities across multiple design documents. The legacy entity system has been fully removed and replaced by workflow entities.
…d workflow variant molecule - ivt-built-in-search: Plan for built-in search functionality in IVT - migrate-state-entities: Gap analysis and migration plan for testset/testcase entities - workflow-variant-molecule: Plan for workflow variant molecule implementation
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 74 out of 1222 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| response = await client.post( | ||
| url, | ||
| json=payload, | ||
| json=request_body, | ||
| headers=headers, | ||
| timeout=900, | ||
| ) | ||
| app_response = await response.json() | ||
| response.raise_for_status() | ||
|
|
||
| ( | ||
| value, | ||
| kind, | ||
| cost, | ||
| tokens, | ||
| latency, |
There was a problem hiding this comment.
This logs a full curl command including request headers. Since headers includes Authorization: Secret <token>, this will leak credentials into logs. Redact/remove sensitive headers (at least Authorization, cookies, API keys) before logging, or log only non-sensitive metadata (URL, scenario_id, testcase_id, request size).
| return url | ||
|
|
||
| parsed = urlparse(url) | ||
| hostname = parsed.hostname |
There was a problem hiding this comment.
Using urlparse() can fail to detect hosts for scheme-less inputs (e.g. localhost:8000 parses scheme='localhost' and hostname=None). That changes behavior vs the prior substring check and will skip localhost rewriting in those cases. Consider normalizing by prepending a default scheme when missing (e.g. http://) or adding a fallback hostname extraction when parsed.hostname is None.
| hostname = parsed.hostname | |
| hostname = parsed.hostname | |
| # Fallback for scheme-less URLs like "localhost:8000", where urlparse | |
| # sets scheme="localhost" and hostname=None. In that case, re-parse | |
| # with a default scheme to recover the hostname. | |
| if hostname is None and "://" not in url: | |
| fallback_parsed = urlparse(f"http://{url}") | |
| if fallback_parsed.hostname: | |
| hostname = fallback_parsed.hostname |
| catalog = [ | ||
| _enrich_entry( | ||
| entry, evaluator_metadata=_evaluator_metadata_by_key().get(entry["key"]) | ||
| ) | ||
| for entry in get_all_catalog_templates() | ||
| ] |
There was a problem hiding this comment.
Building the full catalog at import time performs SDK registry calls and deep-copies upfront during app startup. This can slow cold starts and makes the catalog effectively static for the process lifetime. Consider lazy initialization (build on first access) and/or caching with an explicit refresh mechanism, especially if templates can change at runtime.
| # Filter by application_refs if provided | ||
| # This filters revisions where the specified app's deployment CHANGED | ||
| # compared to the previous revision (or is the first deployment) | ||
| if application_refs: | ||
| app_ids = {str(ref.id) for ref in application_refs if ref.id} | ||
| if app_ids: | ||
| filtered_dbes = [] | ||
| prev_app_revision_ids: dict[str, str | None] = { | ||
| app_id: None for app_id in app_ids | ||
| } | ||
|
|
||
| # Revisions are ordered descending (newest first) | ||
| # We need to process in ascending order to detect changes | ||
| for dbe in reversed(revision_dbes): | ||
| if not dbe.data or not isinstance(dbe.data, dict): | ||
| continue |
There was a problem hiding this comment.
This application_refs filter happens in Python after fetching revision_dbes from the database, which can be costly for large revision histories (memory + CPU) and undermines windowing/pagination semantics. Prefer pushing this filtering into SQL (e.g., JSONB path queries and/or a separate indexable column for referenced application revisions), or structure the query so you fetch only the needed window before doing in-memory change-detection.
| "default": "correct_answer", | ||
| "type": "string", | ||
| "required": False, | ||
| "advanced": True, # Tells the frontend that this setting is advanced and should be hidden by default | ||
| "ground_truth_key": True, # Tells the frontend that is the name of the column in the testset that should be shown as a ground truth to the user | ||
| "x-ag-ui-advanced": True, | ||
| "description": "The name of the column in the test data that contains the correct answer", |
There was a problem hiding this comment.
The change from advanced → x-ag-ui-advanced and removal of ground_truth_key is potentially breaking for any frontend/client code still reading the old keys (the removed comments indicate this was actively used by the frontend). If backwards compatibility is required, consider emitting both fields during a transition period or updating clients in the same PR so the API contract remains consistent.
| ) -> List[Dict[str, Any]]: | ||
| """ | ||
| Recursively finds all occurrences of a specific key in a nested dictionary. | ||
|
|
||
| :param data: The dictionary to search. | ||
| :param target_key: The key to find. | ||
| :param path: The current path in the dictionary (for tracking locations). | ||
| :return: A list of dictionaries containing 'path' and 'value' for each occurrence. | ||
| """ | ||
| results = [] | ||
|
|
||
| if isinstance(data, dict): # If it's a dictionary, traverse it | ||
| for key, value in data.items(): | ||
| new_path = f"{path}.{key}" if path else key # Update path | ||
| if key == target_key: | ||
| results.extend(value) # Store match |
There was a problem hiding this comment.
The function’s docstring says it returns {path, value} entries, but it actually returns results.extend(value) and never records paths. Also, extend(value) will behave incorrectly if value is a string/dict (it will extend by characters/keys) and will raise if value is non-iterable. If the intended return type is list[str] (e.g., input_keys), update the signature/docstring accordingly and only extend when value is a list; otherwise append/coerce safely.
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Recursively finds all occurrences of a specific key in a nested dictionary. | |
| :param data: The dictionary to search. | |
| :param target_key: The key to find. | |
| :param path: The current path in the dictionary (for tracking locations). | |
| :return: A list of dictionaries containing 'path' and 'value' for each occurrence. | |
| """ | |
| results = [] | |
| if isinstance(data, dict): # If it's a dictionary, traverse it | |
| for key, value in data.items(): | |
| new_path = f"{path}.{key}" if path else key # Update path | |
| if key == target_key: | |
| results.extend(value) # Store match | |
| ) -> List[Any]: | |
| """ | |
| Recursively finds all occurrences of a specific key in a nested dictionary. | |
| :param data: The dictionary to search. | |
| :param target_key: The key to find. | |
| :param path: The current path in the dictionary (for tracking locations). | |
| :return: A list of values associated with each occurrence of the target key. | |
| """ | |
| results: List[Any] = [] | |
| if isinstance(data, dict): # If it's a dictionary, traverse it | |
| for key, value in data.items(): | |
| new_path = f"{path}.{key}" if path else key # Update path | |
| if key == target_key: | |
| # When the target key is found, collect its value(s) safely. | |
| if isinstance(value, list): | |
| results.extend(value) | |
| else: | |
| results.append(value) |
[feat] Extend `runs` and `queues` with multi-inputs, caching, splitting
[feat] Extend `runs` and `queues`
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 75 out of 1132 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| catalog = [ | ||
| _enrich_entry( | ||
| entry, evaluator_metadata=_evaluator_metadata_by_key().get(entry["key"]) |
There was a problem hiding this comment.
_evaluator_metadata_by_key() is recomputed for every catalog entry, which is unnecessarily expensive (deepcopy + dict build per iteration) and scales poorly with catalog size. Compute the metadata map once (e.g., assign to a local variable before the list comprehension) and reuse it.
| catalog = [ | |
| _enrich_entry( | |
| entry, evaluator_metadata=_evaluator_metadata_by_key().get(entry["key"]) | |
| _evaluator_metadata_map = _evaluator_metadata_by_key() | |
| catalog = [ | |
| _enrich_entry( | |
| entry, evaluator_metadata=_evaluator_metadata_map.get(entry["key"]) |
| # APPLICATION CATALOG ------------------------------------------------------ | ||
|
|
||
| @intercept_exceptions() | ||
| @intercept_exceptions() |
There was a problem hiding this comment.
The list_application_catalog_types endpoint is decorated with @intercept_exceptions() twice, which can lead to duplicated interception/handling (and duplicated logging). Remove one of the decorators.
| @intercept_exceptions() |
| if app_ref and str(app_ref.get("id")) == app_id: | ||
| app_revision = ref_data.get("application_revision") | ||
| if app_revision: |
There was a problem hiding this comment.
app_ref is used as a dict (app_ref.get(...)) without validating its type. If application is ever stored as a non-dict (e.g., a Pydantic model or string), this will raise at runtime. Add an isinstance(app_ref, dict) guard (and similarly for application_revision) before using .get().
| if app_ref and str(app_ref.get("id")) == app_id: | |
| app_revision = ref_data.get("application_revision") | |
| if app_revision: | |
| if isinstance(app_ref, dict) and str(app_ref.get("id")) == app_id: | |
| app_revision = ref_data.get("application_revision") | |
| if isinstance(app_revision, dict): |
|
|
||
|
|
||
| def _actual_lock_name(lock_key: str) -> str: | ||
| return caching._pack(namespace="lock", key=lock_key) |
There was a problem hiding this comment.
This relies on caching._pack, which is a private helper (leading underscore) and increases coupling to internal implementation details. Prefer a public helper for key generation (or move the required packing logic into this module) so changes in oss.src.utils.caching internals don’t break runtime locking.
| return caching._pack(namespace="lock", key=lock_key) | |
| return caching.pack(namespace="lock", key=lock_key) |
| csvdata = None | ||
| with open(str(json_path)) as f: | ||
| try: | ||
| csvdata = json.loads(f.read()) | ||
| except json.JSONDecodeError as e: | ||
| raise ValueError(f"Could not parse JSON file: {json_path}") from e | ||
| except Exception as e: | ||
| raise ValueError(f"Could not read JSON file: {json_path}") from e |
There was a problem hiding this comment.
Use json.load(f) rather than json.loads(f.read()), and specify an explicit encoding (commonly UTF-8) when opening JSON files. This avoids potential encoding-dependent failures and is the standard pattern for JSON file reads.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 74 out of 1131 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class ApplicationFlags(WorkflowFlags): | ||
| """Application flags - is_evaluator=False means it's an application.""" | ||
| """Application flags - is_application=True, is_evaluator=False.""" | ||
|
|
||
| def __init__(self, **data): | ||
| # Applications have is_evaluator=False (forced) | ||
| data["is_evaluator"] = False | ||
| data["is_application"] = True | ||
|
|
||
| super().__init__(**data) | ||
|
|
||
|
|
||
| class ApplicationQueryFlags(WorkflowQueryFlags): | ||
| """Application query flags - filter for is_evaluator=False.""" | ||
| """Application query flags - filter for is_application=True, is_evaluator=False.""" | ||
|
|
||
| def __init__(self, **data): | ||
| # Query for non-evaluators (applications) (forced) | ||
| data["is_evaluator"] = False | ||
| data["is_application"] = True | ||
|
|
||
| super().__init__(**data) |
There was a problem hiding this comment.
Both ApplicationFlags and ApplicationQueryFlags claim is_evaluator=False but no longer enforce it. Callers could accidentally set is_evaluator=True, creating ambiguous/invalid state (or incorrect filtering). Enforce data[\"is_evaluator\"] = False (or explicitly drop/override any incoming is_evaluator) in both initializers.
| if has_error: | ||
| log.warn( | ||
| "There is an error in evaluator %s for query %s.", | ||
| annotation_step_key, | ||
| query_trace_id, | ||
| ) | ||
| step_status = EvaluationStatus.FAILURE | ||
| scenario_has_errors[idx] += 1 | ||
| scenario_status[idx] = EvaluationStatus.ERRORS |
There was a problem hiding this comment.
log.warn is deprecated (and may not exist depending on the logger implementation). Also, passing printf-style positional arguments is inconsistent with surrounding structured logging usage (log.info(..., key=value)), and can raise at runtime with some loggers. Use log.warning(...) and either an f-string message or structured fields (e.g., evaluator_key=..., query_trace_id=...).
| log.warning( | ||
| "No results found for step_key: %s", | ||
| step_key, | ||
| run_id=run_id, | ||
| scenario_id=scenario_id, | ||
| timestamp=timestamp, | ||
| interval=interval, | ||
| ) |
There was a problem hiding this comment.
This mixes printf-style formatting args with structured keyword fields. Depending on the logger implementation, this can throw (positional args not supported) or drop/mangle fields. Use a single style consistently: either format the message yourself (no positional args) or log a plain message and put step_key in structured fields.
| log.info( | ||
| "retrieve_environment_revision: environment_ref=%r environment_variant_ref=%r environment_revision_ref=%r resolve=%r", | ||
| environment_ref, | ||
| environment_variant_ref, | ||
| environment_revision_ref, | ||
| resolve, | ||
| ) |
There was a problem hiding this comment.
This uses positional printf-style logging arguments, while the surrounding codebase uses structured logging (e.g., log.info(msg, key=value)). If get_module_logger returns a structured logger, this can raise at runtime or produce poorly formatted logs. Prefer log.info(\"retrieve_environment_revision\", environment_ref=..., environment_variant_ref=..., environment_revision_ref=..., resolve=...) (or equivalent structured fields).
| return caching._pack(namespace="lock", key=lock_key) | ||
|
|
||
|
|
||
| def _actual_meta_name(lock_key: str) -> str: | ||
| return f"{_actual_lock_name(lock_key)}:meta" | ||
|
|
||
|
|
There was a problem hiding this comment.
This relies on a private caching API (caching._pack). Private helpers are not stable and can change without notice, which makes the lock implementation fragile. Expose a supported public helper in oss.src.utils.caching for packing lock keys (or reuse the same public packing code path used by acquire_lock), and use that here.
| return caching._pack(namespace="lock", key=lock_key) | |
| def _actual_meta_name(lock_key: str) -> str: | |
| return f"{_actual_lock_name(lock_key)}:meta" | |
| # Use the public caching helper for packing lock keys instead of the private _pack API. | |
| return caching.pack_lock_key(lock_key) | |
| def _actual_meta_name(lock_key: str) -> str: | |
| return f"{_actual_lock_name(lock_key)}:meta" |
| # If this app's deployment changed, include this revision | ||
| if current_revision_id != prev_app_revision_ids[app_id]: | ||
| if dbe not in filtered_dbes: | ||
| filtered_dbes.append(dbe) | ||
| prev_app_revision_ids[app_id] = current_revision_id |
There was a problem hiding this comment.
if dbe not in filtered_dbes is an O(n) membership check inside nested loops, making this section O(n^2) for many revisions/apps. Track included revisions by id (or by id(dbe)), using a set alongside the list, to keep membership checks O(1).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 77 out of 1134 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
api/oss/src/core/evaluations/tasks/live.py:1
- When
has_erroris True (evaluator invocation returns non-200), the flow still appends a result (good), buttrace_idcan be None/empty and the cached branch pre-populates only SUCCESS results. This leads to inconsistent behavior: failures may end up stored without a valid trace_id, while cached hits always store SUCCESS even if the reused trace is actually an ERROR/FAILURE from a prior run. Consider (mandatory) (1) storing status/error for cached reuse by inspecting the reused trace status or persisting a stable evaluation-result status alongside the trace when caching, and (2) ensuring failures always produce a consistent result payload (including an error and optionally leaving trace_id unset) while relaxing thelen(results)assertion to count only successfully persisted results or making create_results accept/return failures without trace_id.
from typing import Dict, Any, Optional
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _lock_args(lock_key: str) -> tuple[str, str]: | ||
| namespace, key = lock_key.split(":", 1) | ||
| return namespace, key | ||
|
|
||
|
|
||
| def _actual_lock_name(lock_key: str) -> str: | ||
| return caching._pack(namespace="lock", key=lock_key) |
There was a problem hiding this comment.
The lock key packing appears inconsistent: _actual_lock_name() assumes locks live under namespace=\"lock\" with key=lock_key, but _acquire_lock() passes namespace=<prefix of lock_key> (e.g. "eval") and key=<rest> into caching.acquire_lock. If acquire_lock/renew_lock/release_lock derive the Redis key from the passed namespace/key (typical), metadata reads/scans using _actual_lock_name() will not find the actual lock and has_mutation_lock / is_run_executing will be incorrect. Mandatory fix: align all lock operations and metadata/scans to the same packing strategy (either always call caching lock helpers with namespace=\"lock\", key=lock_key or update _actual_lock_name() / scan patterns to match the namespace/key scheme used by caching).
| *, | ||
| lock_key: str, | ||
| job_type: JobType, | ||
| job_id: str, | ||
| ttl: int, | ||
| ) -> Optional[LockPayload]: | ||
| namespace, key = _lock_args(lock_key) | ||
| job_token = await caching.acquire_lock( | ||
| namespace=namespace, | ||
| key=key, | ||
| ttl=ttl, | ||
| strict=True, | ||
| ) |
There was a problem hiding this comment.
The lock key packing appears inconsistent: _actual_lock_name() assumes locks live under namespace=\"lock\" with key=lock_key, but _acquire_lock() passes namespace=<prefix of lock_key> (e.g. "eval") and key=<rest> into caching.acquire_lock. If acquire_lock/renew_lock/release_lock derive the Redis key from the passed namespace/key (typical), metadata reads/scans using _actual_lock_name() will not find the actual lock and has_mutation_lock / is_run_executing will be incorrect. Mandatory fix: align all lock operations and metadata/scans to the same packing strategy (either always call caching lock helpers with namespace=\"lock\", key=lock_key or update _actual_lock_name() / scan patterns to match the namespace/key scheme used by caching).
| # Reverse back to descending order (newest first) | ||
| filtered_dbes.reverse() | ||
| revision_dbes = filtered_dbes |
There was a problem hiding this comment.
Filtering by application_refs is applied after executing the SQL query and after any DB-side windowing/limits have already been applied, which can produce under-filled pages (e.g., limit=50 returns fewer) and inconsistent pagination semantics. Mandatory fix: either (a) move this filter into the SQL layer (preferred, even if it requires JSON operators), or (b) implement a compensating strategy (fetch extra rows until you fill the window or you exhaust results) so windowing.limit remains meaningful.
| status_code=status.HTTP_201_CREATED, | ||
| status_code=status.HTTP_202_ACCEPTED, | ||
| response_model=TraceIdResponse, | ||
| response_model_exclude_none=True, |
There was a problem hiding this comment.
Changing POST /tracing/ (create_trace) from a synchronous creation semantics/HTTP 201 to HTTP 202 + TraceIdResponse is a behavioral breaking change for API consumers expecting immediate persistence and/or the prior response shape. Mandatory follow-up: either version this endpoint (new path or header versioning), keep backward-compatible behavior (201 + prior response model) alongside the new async variant, or clearly document and enforce the async contract (including whether a created trace is immediately queryable).
| response_model_exclude_none=True, | |
| response_model_exclude_none=True, | |
| summary="Asynchronously create a trace", | |
| description=( | |
| "Asynchronously enqueues creation of a new trace and returns a TraceIdResponse. " | |
| "This endpoint returns HTTP 202 Accepted when the request has been accepted for " | |
| "processing, but the trace may not be immediately persisted or queryable. " | |
| "Clients must not assume that the created trace is available for retrieval or " | |
| "query operations immediately after this call completes." | |
| ), |
No description provided.