diff --git a/changelog.d/1037.added b/changelog.d/1037.added new file mode 100644 index 000000000..dc790375c --- /dev/null +++ b/changelog.d/1037.added @@ -0,0 +1 @@ +Added the Stage 5 release promotion contract and runtime manifest output. diff --git a/docs/engineering/pipeline-map.md b/docs/engineering/pipeline-map.md index 6b50c8949..b70d3156e 100644 --- a/docs/engineering/pipeline-map.md +++ b/docs/engineering/pipeline-map.md @@ -1395,6 +1395,14 @@ class ReleasePromotionContext Canonical run, candidate, release, and destination identity for Stage 5. +### `policyengine_us_data.release_promotion.contract.ReleasePromotionContractBuilder` + +```python +class ReleasePromotionContractBuilder +``` + +Build a Stage 5 contract from candidate identity and promotion results. + ### `modal_app.local_area._resolve_scope_fingerprint` ```python diff --git a/docs/engineering/stages/release_promotion.md b/docs/engineering/stages/release_promotion.md index 9c9778ac5..0276634bd 100644 --- a/docs/engineering/stages/release_promotion.md +++ b/docs/engineering/stages/release_promotion.md @@ -121,3 +121,29 @@ cleanup `status` as `skipped`, `completed`, or `failed` on `CleanupPromotionResult`. Later contract, index, diagnostics, and status writers should read this typed material instead of scraping logs or reconstructing public paths independently. + +## Release Promotion Contract + +Stage 5 writes `release_promotion_contract.json` under the run-local +`diagnostics/contracts/` directory after the promotion transaction succeeds and +before the Stage 5 step manifest is completed. The contract is the semantic +record for the Stage 5 boundary: it ties the canonical `run_id`, candidate +identity, Stage 4 output contract reference when available, validation report +paths, public Hugging Face and GCS refs, cleanup status, and typed +`FullPromotionResult` into one durable `StageContract`. + +The contract complements the public release files instead of replacing them: + +- `release_manifest.json` and `releases/{version}/release_manifest.json` remain + the public artifact inventory for the stable release. +- `version_manifest.json` remains the public version registry used by clients + and publication checks. +- `releases/{version}/release-complete.json` remains the final completion + marker and tag target proving the release was fully finalized. +- `release_promotion_contract.json` remains run-scoped diagnostics material for + dashboards, AI agents, rerun comparison, and promotion auditability. + +Runtime step manifests for `5_validate_and_promote_release` should include the +contract as a JSON `contract` output. They may still record legacy validated +input artifacts for compatibility, but the contract is the preferred semantic +entry point for Stage 5 status and lineage. diff --git a/docs/generated/pipeline_api.json b/docs/generated/pipeline_api.json index 08561c8a6..66c912d4c 100644 --- a/docs/generated/pipeline_api.json +++ b/docs/generated/pipeline_api.json @@ -1232,7 +1232,7 @@ "docstring": "Typed result for a full Stage 5 release promotion transaction.", "id": "full_promotion_result", "kind": "class", - "line": 62, + "line": 63, "metadata": { "api_refs": [ "policyengine_us_data.release_promotion.results.FullPromotionResult", @@ -3086,7 +3086,7 @@ "docstring": "Promote a completed pipeline run to production.\n\n1. Verify run status is \"completed\"\n2. Promote every staged artifact in one Hugging Face commit\n3. Upload/copy every artifact to GCS\n4. Finalize release_manifest.json, tag the release, and update\n version_manifest.json\n5. Update run status to \"promoted\"\n\nArgs:\n run_id: The run ID to promote.\n candidate_version: Candidate staging scope used for staged source files.\n release_version: Stable version used for final release metadata.\n\nReturns:\n Summary message.", "id": "promote_pipeline_run", "kind": "function", - "line": 1910, + "line": 2079, "metadata": { "api_refs": [ "modal_app.pipeline.promote_run" @@ -3347,6 +3347,40 @@ "signature": "class ReleasePromotionContext", "source_file": "policyengine_us_data/release_promotion/context.py" }, + "release_promotion_contract_builder": { + "docstring": "Build a Stage 5 contract from candidate identity and promotion results.", + "id": "release_promotion_contract_builder", + "kind": "class", + "line": 71, + "metadata": { + "api_refs": [ + "policyengine_us_data.release_promotion.contract.ReleasePromotionContractBuilder" + ], + "artifacts_in": [ + "release candidate bundle", + "typed promotion result" + ], + "artifacts_out": [ + "release_promotion_contract.json" + ], + "description": "Build the canonical Stage 5 release promotion contract.", + "id": "release_promotion_contract_builder", + "label": "ReleasePromotionContractBuilder", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/release_promotion/contract.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_contract.py" + ] + }, + "object_path": "policyengine_us_data.release_promotion.contract.ReleasePromotionContractBuilder", + "signature": "class ReleasePromotionContractBuilder", + "source_file": "policyengine_us_data/release_promotion/contract.py" + }, "resolve_scope_fingerprint": { "docstring": "Compute the scope fingerprint while preserving pinned resume values.", "id": "resolve_scope_fingerprint", @@ -3507,7 +3541,7 @@ "docstring": "Run the full pipeline end-to-end.\n\nArgs:\n branch: Git branch to build from.\n gpu: GPU type for regional calibration.\n epochs: Training epochs for regional calibration.\n national_gpu: GPU type for national calibration.\n national_epochs: Training epochs for national.\n num_workers: Number of parallel H5 workers.\n n_clones: Number of clones for H5 building.\n skip_national: Skip national calibration/H5.\n resume_run_id: Resume a previously failed run.\n clear_checkpoints: Wipe ALL checkpoints before building\n (default False). Normally not needed \u2014 checkpoints are\n scoped by commit SHA, so stale ones from other commits\n are cleaned automatically. Use True only to force a\n full rebuild of the current commit.\n candidate_version: Candidate staging scope used for HF staging.\n release_version: Final stable release version. Usually empty until\n promotion.\n base_release_version: Stable release current when this candidate was\n built.\n release_bump: Intended SemVer bump for this candidate.\n sha_override: Exact source SHA deployed by GitHub Actions. When\n provided, this is recorded instead of reading the current\n branch tip.\n run_id: Cross-system run ID created by GitHub.\n run_context: Serialized run context from the launcher workflow.\n modal_app_name: Deployed Modal app name for this run.\n modal_environment: Modal environment used for this run.\n chunked_matrix: Build the calibration matrix in clone-household\n chunks instead of the non-chunked path. Opt-in; default off.\n chunk_size: Clone-household columns per chunk when\n ``chunked_matrix`` is True.\n parallel_matrix: Fan chunked matrix building across Modal\n workers via ``build_matrix_chunk_worker``. Only meaningful\n when ``chunked_matrix`` is True; ignored otherwise.\n num_matrix_workers: Number of Modal workers when\n ``parallel_matrix`` is True.\n\nReturns:\n The run ID for use with promote.", "id": "run_modal_pipeline", "kind": "function", - "line": 943, + "line": 1112, "metadata": { "api_refs": [ "modal_app.pipeline.run_pipeline" @@ -4387,7 +4421,7 @@ "docstring": "Verify deployed-image imports and subprocess seams.", "id": "verify_runtime_seams", "kind": "function", - "line": 569, + "line": 738, "metadata": { "api_refs": [ "modal_app.pipeline.verify_runtime_seams" diff --git a/docs/generated/pipeline_map.json b/docs/generated/pipeline_map.json index 612344f3d..265bb9edc 100644 --- a/docs/generated/pipeline_map.json +++ b/docs/generated/pipeline_map.json @@ -1504,6 +1504,31 @@ "uv run pytest tests/unit/release_promotion/test_candidate.py" ] }, + { + "api_refs": [ + "policyengine_us_data.release_promotion.contract.ReleasePromotionContractBuilder" + ], + "artifacts_in": [ + "release candidate bundle", + "typed promotion result" + ], + "artifacts_out": [ + "release_promotion_contract.json" + ], + "description": "Build the canonical Stage 5 release promotion contract.", + "id": "release_promotion_contract_builder", + "label": "ReleasePromotionContractBuilder", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/release_promotion/contract.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_contract.py" + ] + }, { "api_refs": [ "policyengine_us_data.build_outputs.fingerprinting.FingerprintingService", @@ -1971,9 +1996,9 @@ } ], "metadata": { - "api_node_count": 95, + "api_node_count": 96, "canonical_stage_count": 5, - "decorated_object_count": 153, + "decorated_object_count": 154, "mapped_decorated_node_count": 58, "stage_count": 17, "substage_count": 17 diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index e71b44d09..be0a3fcf7 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -549,6 +549,175 @@ def _promote_full_release_from_staging( ) +def _promotion_result_from_stdout(promotion_stdout: str): + """Parse typed promotion results from the promotion subprocess output.""" + + from policyengine_us_data.release_promotion import parse_full_promotion_result_json + + try: + return parse_full_promotion_result_json(promotion_stdout) + except ValueError as exc: + raise RuntimeError( + "Full release promotion subprocess did not return a valid " + "typed promotion result." + ) from exc + + +def _release_promotion_context_from_run_context(run_context: RunContext): + """Build the Stage 5 library context from the orchestration run context.""" + + from policyengine_us_data.release_promotion import ReleasePromotionContext + + return ReleasePromotionContext( + run_id=run_context.run_id, + candidate_version=run_context.candidate_version, + release_version=run_context.release_version, + hf_repo_name="policyengine/policyengine-us-data", + gcs_bucket_name="policyengine-us-data", + base_release_version=run_context.base_release_version or None, + release_bump=run_context.release_bump or None, + modal_app_name=run_context.modal_app_name or None, + modal_environment=run_context.modal_environment or None, + hf_staging_prefix=run_context.hf_staging_prefix or None, + metadata={"run_context": run_context.to_dict()}, + ) + + +def _release_artifact_metadata_by_path( + run_id: str, + rel_paths: list[str], +) -> dict[str, dict[str, object]]: + """Return local checksum/size metadata for staged release artifacts.""" + + metadata: dict[str, dict[str, object]] = {} + for local_path, rel_path in _full_release_manifest_files(run_id, rel_paths): + path = Path(local_path) + if not path.exists() or not path.is_file(): + continue + reference = ArtifactReference.from_path(path) + metadata[rel_path] = { + "sha256": f"sha256:{reference.sha256}", + "size_bytes": reference.size_bytes, + } + return metadata + + +def _stage4_output_contract_repo_path_if_available(run_id: str) -> str | None: + """Return the run-repo path for the Stage 4 contract when it exists locally.""" + + run_dir = _run_dir(run_id) + candidates = ( + run_dir / "diagnostics" / "contracts" / "output_build_contract.json", + run_dir / "contracts" / "output_build_contract.json", + run_dir / "output_build_contract.json", + ) + for path in candidates: + if path.exists() and path.is_file(): + return f"calibration/runs/{run_id}/{path.relative_to(run_dir).as_posix()}" + return None + + +RUN_DIAGNOSTICS_VALIDATION_REPORT_FILENAMES = ( + "validation_report.json", + "validation_summary.json", + "validation_results.csv", + "national_validation.txt", +) +RUN_DIAGNOSTICS_MANIFEST_FILENAMES = ( + "manifest.json", + "diagnostics_manifest.json", +) + + +def _run_diagnostics_repo_path_if_available(run_id: str, filename: str) -> str | None: + """Return the repo path for a run-local diagnostics file when present.""" + + run_dir = _run_dir(run_id) + path = run_dir / "diagnostics" / filename + if not path.exists() or not path.is_file(): + return None + return f"calibration/runs/{run_id}/{path.relative_to(run_dir).as_posix()}" + + +def _run_validation_report_repo_paths_if_available(run_id: str) -> list[str]: + """Return uploaded-run paths for validation diagnostics available locally.""" + + return [ + repo_path + for filename in RUN_DIAGNOSTICS_VALIDATION_REPORT_FILENAMES + if ( + repo_path := _run_diagnostics_repo_path_if_available( + run_id, + filename, + ) + ) + ] + + +def _run_diagnostics_manifest_repo_path_if_available(run_id: str) -> str | None: + """Return the run diagnostics manifest path when one exists locally.""" + + for filename in RUN_DIAGNOSTICS_MANIFEST_FILENAMES: + repo_path = _run_diagnostics_repo_path_if_available(run_id, filename) + if repo_path is not None: + return repo_path + return None + + +def _write_release_promotion_contract_for_run( + *, + meta: RunMetadata, + run_context: RunContext, + rel_paths: list[str], + promotion_result, +) -> ArtifactReference: + """Write Stage 5's run-local contract and return its manifest reference.""" + + from policyengine_us_data.release_promotion import ( + build_legacy_release_candidate_bundle, + release_promotion_contract_path, + write_release_promotion_contract, + ) + + run_dir = _run_dir(run_context.run_id) + contract_path = release_promotion_contract_path(run_dir) + candidate_bundle = build_legacy_release_candidate_bundle( + context=_release_promotion_context_from_run_context(run_context), + rel_paths=rel_paths, + artifact_metadata_by_path=_release_artifact_metadata_by_path( + run_context.run_id, + rel_paths, + ), + source_output_contract_path=_stage4_output_contract_repo_path_if_available( + run_context.run_id + ), + validation_report_paths=_run_validation_report_repo_paths_if_available( + run_context.run_id + ), + diagnostics_manifest_path=_run_diagnostics_manifest_repo_path_if_available( + run_context.run_id + ), + ) + write_release_promotion_contract( + contract_path=contract_path, + candidate_bundle=candidate_bundle, + promotion_result=promotion_result, + created_at=datetime.now(timezone.utc).isoformat(), + code_sha=meta.sha, + package_version=meta.version, + metadata={ + "writer": "modal_app.pipeline.promote_run", + "branch": meta.branch, + }, + ) + return ArtifactReference.from_path( + contract_path, + role="contract", + base_dir=run_dir, + media_type="application/json", + ) + + @app.function( image=image, timeout=300, @@ -2039,6 +2208,13 @@ def promote_run( promotion_context.to_dict(), ) print(f" {promotion_stdout}") + promotion_result = _promotion_result_from_stdout(promotion_stdout) + release_promotion_contract_ref = _write_release_promotion_contract_for_run( + meta=meta, + run_context=promotion_context, + rel_paths=rel_paths, + promotion_result=promotion_result, + ) # Update run status only after all required promotion work succeeds. meta.status = "promoted" @@ -2047,8 +2223,11 @@ def promote_run( _complete_step_manifest( promote_manifest, outputs=[ - ArtifactReference.from_dict(artifact) - for artifact in promote_inputs["validated_step_outputs"] + *[ + ArtifactReference.from_dict(artifact) + for artifact in promote_inputs["validated_step_outputs"] + ], + release_promotion_contract_ref, ], reuse_decision="computed", vol=pipeline_volume, diff --git a/policyengine_us_data/release_promotion/__init__.py b/policyengine_us_data/release_promotion/__init__.py index ee64c5e5d..20c0c9124 100644 --- a/policyengine_us_data/release_promotion/__init__.py +++ b/policyengine_us_data/release_promotion/__init__.py @@ -24,6 +24,15 @@ build_release_candidate_bundle_from_stage4_contract, read_stage4_release_candidate_bundle, ) +from .contract import ( + RELEASE_PROMOTION_CONTRACT_FILENAME, + RELEASE_PROMOTION_CONTRACT_TYPE, + ReleasePromotionContractBuilder, + build_release_promotion_contract, + release_promotion_contract_path, + release_promotion_contract_repo_path, + write_release_promotion_contract, +) from .results import ( CleanupPromotionResult, CompletionMarkerPromotionResult, @@ -32,6 +41,7 @@ HuggingFacePromotionResult, ReleaseManifestPromotionResult, VersionManifestPromotionResult, + parse_full_promotion_result_json, ) from .validation import build_release_candidate_shape_report from .validation import ( @@ -48,6 +58,8 @@ "BASE_RELEASE_ARTIFACT_PATHS", "DEFAULT_REQUIRED_RELEASE_ARTIFACT_FAMILIES", "RELEASE_VALIDATION_SUBSTAGE_ID", + "RELEASE_PROMOTION_CONTRACT_FILENAME", + "RELEASE_PROMOTION_CONTRACT_TYPE", "CleanupPromotionResult", "CompletionMarkerPromotionResult", "FullPromotionResult", @@ -55,6 +67,7 @@ "HuggingFacePromotionResult", "ReleaseArtifactSpec", "ReleaseCandidateInputBundle", + "ReleasePromotionContractBuilder", "ReleasePromotionContext", "ReleaseCandidateValidationDependencies", "ReleaseCandidateValidator", @@ -63,6 +76,7 @@ "VALIDATION_REPORT_POLICY_PRESENCE_ONLY", "VALIDATION_REPORT_POLICY_REQUIRE_PASSING", "build_legacy_release_candidate_bundle", + "build_release_promotion_contract", "build_release_candidate_bundle_from_stage4_contract", "build_release_candidate_shape_report", "default_release_candidate_validation_dependencies", @@ -71,6 +85,10 @@ "infer_release_artifact_spec", "logical_name_for_release_path", "normalize_release_path", + "parse_full_promotion_result_json", + "release_promotion_contract_path", + "release_promotion_contract_repo_path", "read_stage4_release_candidate_bundle", "strip_staging_prefix", + "write_release_promotion_contract", ] diff --git a/policyengine_us_data/release_promotion/contract.py b/policyengine_us_data/release_promotion/contract.py new file mode 100644 index 000000000..1a7cdc112 --- /dev/null +++ b/policyengine_us_data/release_promotion/contract.py @@ -0,0 +1,497 @@ +"""Stage 5 release promotion contract assembly.""" + +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from policyengine_us_data.pipeline_metadata import pipeline_node +from policyengine_us_data.stage_contracts import ( + ArtifactRef, + DiagnosticRef, + ExecutionRecord, + ReuseSummary, + StageContract, + SubstageRecord, + ValidationReport, + contract_type_for_stage, + write_contract, +) +from policyengine_us_data.stage_contracts._coercion import freeze_sequence +from policyengine_us_data.stage_contracts.fingerprints import fingerprint_material +from policyengine_us_data.stage_contracts.stages import ( + STAGE_5_VALIDATE_AND_PROMOTE_RELEASE, +) + +from .candidate import ReleaseCandidateInputBundle +from .context import ReleasePromotionContext +from .results import FullPromotionResult + +RELEASE_PROMOTION_CONTRACT_FILENAME = "release_promotion_contract.json" +RELEASE_PROMOTION_CONTRACT_TYPE = contract_type_for_stage( + STAGE_5_VALIDATE_AND_PROMOTE_RELEASE +) + + +def release_promotion_contract_repo_path(run_id: str) -> str: + """Return the run-scoped repository path for the Stage 5 contract.""" + + return ( + f"calibration/runs/{run_id}/diagnostics/contracts/" + f"{RELEASE_PROMOTION_CONTRACT_FILENAME}" + ) + + +def release_promotion_contract_path(run_dir: str | Path) -> Path: + """Return the run-local diagnostics/contracts path for the Stage 5 contract.""" + + return ( + Path(run_dir) + / "diagnostics" + / "contracts" + / RELEASE_PROMOTION_CONTRACT_FILENAME + ) + + +@pipeline_node( + id="release_promotion_contract_builder", + label="ReleasePromotionContractBuilder", + node_type="library", + description="Build the canonical Stage 5 release promotion contract.", + status="transitional", + stability="moving", + pathways=["5_validate_and_promote_release"], + artifacts_in=["release candidate bundle", "typed promotion result"], + artifacts_out=["release_promotion_contract.json"], + validation_commands=["uv run pytest tests/unit/release_promotion/test_contract.py"], +) +@dataclass(frozen=True, kw_only=True) +class ReleasePromotionContractBuilder: + """Build a Stage 5 contract from candidate identity and promotion results.""" + + candidate_bundle: ReleaseCandidateInputBundle + promotion_result: FullPromotionResult + created_at: str + code_sha: str | None = None + package_version: str | None = None + validation: ValidationReport | None = None + diagnostics: Sequence[DiagnosticRef] = () + metadata: Mapping[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if not isinstance(self.candidate_bundle, ReleaseCandidateInputBundle): + raise ValueError("candidate_bundle must be ReleaseCandidateInputBundle") + if not isinstance(self.promotion_result, FullPromotionResult): + raise ValueError("promotion_result must be FullPromotionResult") + object.__setattr__( + self, + "diagnostics", + freeze_sequence(self.diagnostics, "diagnostics", DiagnosticRef), + ) + _validate_result_matches_candidate( + self.promotion_result, + self.candidate_bundle, + ) + + def build(self) -> StageContract: + """Return the canonical Stage 5 release promotion contract.""" + + context = self.candidate_bundle.context + inputs = _contract_inputs(self.candidate_bundle) + outputs = _contract_outputs(self.promotion_result) + parameters = _contract_parameters( + self.candidate_bundle, + self.promotion_result, + ) + return StageContract( + contract_type=RELEASE_PROMOTION_CONTRACT_TYPE, + stage_id=STAGE_5_VALIDATE_AND_PROMOTE_RELEASE, + run_id=context.run_id, + created_at=self.created_at, + code_sha=self.code_sha, + package_version=self.package_version, + inputs=inputs, + outputs=outputs, + parameters=parameters, + fingerprint=fingerprint_material( + { + "stage_id": STAGE_5_VALIDATE_AND_PROMOTE_RELEASE, + "contract_type": RELEASE_PROMOTION_CONTRACT_TYPE, + "context": context.to_dict(), + "candidate_bundle": self.candidate_bundle.to_dict(), + "promotion_result": self.promotion_result.to_dict(), + "outputs": [output.to_dict() for output in outputs], + } + ), + substages=_substage_records( + candidate_inputs=inputs, + public_outputs=outputs, + promotion_result=self.promotion_result, + ), + execution=_execution_record(self.promotion_result), + validation=self.validation, + diagnostics=tuple(self.diagnostics), + metadata=_contract_metadata( + context=context, + candidate_bundle=self.candidate_bundle, + promotion_result=self.promotion_result, + outputs=outputs, + extra=self.metadata, + ), + ) + + +def build_release_promotion_contract( + *, + candidate_bundle: ReleaseCandidateInputBundle, + promotion_result: FullPromotionResult, + created_at: str, + code_sha: str | None = None, + package_version: str | None = None, + validation: ValidationReport | None = None, + diagnostics: Sequence[DiagnosticRef] = (), + metadata: Mapping[str, Any] | None = None, +) -> StageContract: + """Build the Stage 5 release promotion contract.""" + + return ReleasePromotionContractBuilder( + candidate_bundle=candidate_bundle, + promotion_result=promotion_result, + created_at=created_at, + code_sha=code_sha, + package_version=package_version, + validation=validation, + diagnostics=diagnostics, + metadata=metadata or {}, + ).build() + + +def write_release_promotion_contract( + *, + contract_path: str | Path, + candidate_bundle: ReleaseCandidateInputBundle, + promotion_result: FullPromotionResult, + created_at: str, + code_sha: str | None = None, + package_version: str | None = None, + validation: ValidationReport | None = None, + diagnostics: Sequence[DiagnosticRef] = (), + metadata: Mapping[str, Any] | None = None, +) -> StageContract: + """Build, write, and return the Stage 5 release promotion contract.""" + + contract = build_release_promotion_contract( + candidate_bundle=candidate_bundle, + promotion_result=promotion_result, + created_at=created_at, + code_sha=code_sha, + package_version=package_version, + validation=validation, + diagnostics=diagnostics, + metadata=metadata, + ) + write_contract(contract, contract_path) + return contract + + +def _validate_result_matches_candidate( + result: FullPromotionResult, + candidate_bundle: ReleaseCandidateInputBundle, +) -> None: + context = candidate_bundle.context + if result.run_id != context.run_id: + raise ValueError("promotion_result.run_id must match context.run_id") + if result.candidate_version != context.candidate_version: + raise ValueError( + "promotion_result.candidate_version must match context.candidate_version" + ) + if result.release_version != context.release_version: + raise ValueError( + "promotion_result.release_version must match context.release_version" + ) + if result.hf.repo_name != context.hf_repo_name: + raise ValueError( + "promotion_result.hf.repo_name must match context.hf_repo_name" + ) + if result.gcs.bucket_name != context.gcs_bucket_name: + raise ValueError( + "promotion_result.gcs.bucket_name must match context.gcs_bucket_name" + ) + if result.artifact_count != len(candidate_bundle.artifacts): + raise ValueError( + "promotion_result.artifact_count must match candidate artifacts" + ) + + +def _contract_inputs( + candidate_bundle: ReleaseCandidateInputBundle, +) -> tuple[ArtifactRef, ...]: + context = candidate_bundle.context + inputs = [ + artifact.to_artifact_ref( + uri_prefix=f"hf://{context.hf_repo_name}/{context.hf_staging_prefix}", + ) + for artifact in candidate_bundle.artifacts + ] + if candidate_bundle.source_output_contract_path is not None: + inputs.append( + ArtifactRef( + logical_name="stage4_output_contract", + uri=f"hf://{context.hf_repo_name}/{candidate_bundle.source_output_contract_path}", + media_type="application/json", + metadata={ + "artifact_family": "stage_contract", + "source_stage_id": "4_build_outputs", + }, + ) + ) + for index, path in enumerate(candidate_bundle.validation_report_paths, start=1): + inputs.append( + ArtifactRef( + logical_name=f"validation_report_{index}", + uri=f"hf://{context.hf_repo_name}/{path}", + media_type=_diagnostic_media_type(path), + metadata={"artifact_family": "validation_report"}, + ) + ) + if candidate_bundle.diagnostics_manifest_path is not None: + inputs.append( + ArtifactRef( + logical_name="diagnostics_manifest", + uri=( + f"hf://{context.hf_repo_name}/" + f"{candidate_bundle.diagnostics_manifest_path}" + ), + media_type="application/json", + metadata={"artifact_family": "diagnostics_manifest"}, + ) + ) + return tuple(inputs) + + +def _diagnostic_media_type(path: str) -> str: + """Return a conservative media type for run diagnostics referenced by contract.""" + + if path.endswith(".csv"): + return "text/csv" + if path.endswith(".txt"): + return "text/plain" + return "application/json" + + +def _contract_outputs(result: FullPromotionResult) -> tuple[ArtifactRef, ...]: + hf_base = f"hf://{result.hf.repo_name}" + return ( + ArtifactRef( + logical_name="huggingface_release_artifacts", + uri=f"{hf_base}/", + metadata={ + "artifact_family": "release_artifact_collection", + "artifact_count": result.artifact_count, + "promoted_count": result.hf.promoted_count, + "already_finalized": result.already_finalized, + "repo_type": result.hf.repo_type, + "hf_commit": result.hf.commit_id, + "promoted_paths": list(result.hf.promoted_paths), + "noop_paths": list(result.hf.noop_paths), + }, + ), + ArtifactRef( + logical_name="gcs_release_artifacts", + uri=f"gs://{result.gcs.bucket_name}/", + metadata={ + "artifact_family": "release_artifact_collection", + "artifact_count": result.artifact_count, + "uploaded_count": result.gcs.uploaded_count, + "already_finalized": result.already_finalized, + "object_paths": list(result.gcs.object_paths), + "skipped_paths": list(result.gcs.skipped_paths), + }, + ), + ArtifactRef( + logical_name="release_manifest", + uri=_hf_artifact_uri( + result.hf.repo_name, result.release_manifest.root_path + ), + sha256=result.release_manifest.manifest_sha256, + media_type="application/json", + metadata={ + "artifact_family": "release_manifest", + "artifact_count": result.release_manifest.artifact_count, + }, + ), + ArtifactRef( + logical_name="versioned_release_manifest", + uri=_hf_artifact_uri( + result.hf.repo_name, + result.release_manifest.versioned_path, + ), + sha256=result.release_manifest.manifest_sha256, + media_type="application/json", + metadata={ + "artifact_family": "release_manifest", + "artifact_count": result.release_manifest.artifact_count, + }, + ), + ArtifactRef( + logical_name="trace_tro", + uri=_hf_artifact_uri( + result.hf.repo_name, result.release_manifest.trace_tro_path + ), + media_type="application/ld+json", + metadata={"artifact_family": "trace_tro"}, + ), + ArtifactRef( + logical_name="versioned_trace_tro", + uri=_hf_artifact_uri( + result.hf.repo_name, + result.release_manifest.versioned_trace_tro_path, + ), + media_type="application/ld+json", + metadata={"artifact_family": "trace_tro"}, + ), + ArtifactRef( + logical_name="version_manifest", + uri=_hf_artifact_uri(result.hf.repo_name, result.version_manifest.path), + media_type="application/json", + metadata={ + "artifact_family": "version_manifest", + "updated": result.version_manifest.updated, + }, + ), + ArtifactRef( + logical_name="release_completion_marker", + uri=_hf_artifact_uri( + result.hf.repo_name, result.completion_marker.marker_path + ), + media_type="application/json", + metadata={"artifact_family": "release_completion_marker"}, + ), + ) + + +def _hf_artifact_uri(repo_name: str, repo_path: str) -> str: + """Return a Hugging Face URI from typed promotion result path material.""" + + return f"hf://{repo_name}/{repo_path.lstrip('/')}" + + +def _contract_parameters( + candidate_bundle: ReleaseCandidateInputBundle, + result: FullPromotionResult, +) -> dict[str, Any]: + context = candidate_bundle.context + return { + "run_id": context.run_id, + "candidate_version": context.candidate_version, + "release_version": context.release_version, + "base_release_version": context.base_release_version, + "release_bump": context.release_bump, + "hf_repo_name": context.hf_repo_name, + "hf_repo_type": context.hf_repo_type, + "gcs_bucket_name": context.gcs_bucket_name, + "hf_staging_prefix": context.hf_staging_prefix, + "artifact_count": result.artifact_count, + "release_candidate_fingerprint": ( + candidate_bundle.release_candidate_fingerprint + ), + "source_output_contract_path": candidate_bundle.source_output_contract_path, + "validation_report_paths": list(candidate_bundle.validation_report_paths), + "diagnostics_manifest_path": candidate_bundle.diagnostics_manifest_path, + } + + +def _contract_metadata( + *, + context: ReleasePromotionContext, + candidate_bundle: ReleaseCandidateInputBundle, + promotion_result: FullPromotionResult, + outputs: Sequence[ArtifactRef], + extra: Mapping[str, Any], +) -> dict[str, Any]: + return { + **dict(extra), + "contract_file": RELEASE_PROMOTION_CONTRACT_FILENAME, + "contract_repo_path": release_promotion_contract_repo_path(context.run_id), + "candidate_bundle_type": candidate_bundle.bundle_type, + "candidate_metadata": candidate_bundle.metadata, + "cleanup": promotion_result.cleanup.to_dict(), + "already_finalized": promotion_result.already_finalized, + "promotion_result": promotion_result.to_dict(), + "public_refs": {output.logical_name: output.uri for output in outputs}, + } + + +def _execution_record(result: FullPromotionResult) -> ExecutionRecord: + return ExecutionRecord( + status="completed", + reuse_decision="reused" if result.already_finalized else "computed", + reuse_reason=( + "already_finalized" if result.already_finalized else "fresh_promotion" + ), + reuse_summary=ReuseSummary( + expected_outputs=result.artifact_count, + valid_reused_outputs=( + result.artifact_count if result.already_finalized else 0 + ), + recomputed_outputs=0 if result.already_finalized else result.artifact_count, + ), + ) + + +def _substage_records( + *, + candidate_inputs: Sequence[ArtifactRef], + public_outputs: Sequence[ArtifactRef], + promotion_result: FullPromotionResult, +) -> tuple[SubstageRecord, ...]: + outputs_by_name = {artifact.logical_name: artifact for artifact in public_outputs} + return ( + SubstageRecord( + substage_id="5a_validate_outputs", + status="completed", + inputs=tuple(candidate_inputs), + reuse_mode="observed_only", + metadata={"artifact_count": promotion_result.artifact_count}, + ), + SubstageRecord( + substage_id="5b_promote_huggingface", + status="completed", + outputs=(outputs_by_name["huggingface_release_artifacts"],), + reuse_mode="handoff", + metadata={ + "promoted_count": promotion_result.hf.promoted_count, + "already_finalized": promotion_result.already_finalized, + }, + ), + SubstageRecord( + substage_id="5c_promote_gcs", + status="completed", + outputs=(outputs_by_name["gcs_release_artifacts"],), + reuse_mode="handoff", + metadata={ + "uploaded_count": promotion_result.gcs.uploaded_count, + "already_finalized": promotion_result.already_finalized, + }, + ), + SubstageRecord( + substage_id="5d_write_version_manifest", + status="completed", + outputs=( + outputs_by_name["release_manifest"], + outputs_by_name["versioned_release_manifest"], + outputs_by_name["trace_tro"], + outputs_by_name["versioned_trace_tro"], + outputs_by_name["version_manifest"], + outputs_by_name["release_completion_marker"], + ), + reuse_mode="handoff", + metadata={ + "version_manifest_updated": promotion_result.version_manifest.updated, + "cleanup": promotion_result.cleanup.to_dict(), + "already_finalized": promotion_result.already_finalized, + }, + ), + ) diff --git a/policyengine_us_data/release_promotion/results/__init__.py b/policyengine_us_data/release_promotion/results/__init__.py index 761d14f55..3d10b5acb 100644 --- a/policyengine_us_data/release_promotion/results/__init__.py +++ b/policyengine_us_data/release_promotion/results/__init__.py @@ -11,7 +11,7 @@ GcsPromotionResult, HuggingFacePromotionResult, ) -from .full import FullPromotionResult +from .full import FullPromotionResult, parse_full_promotion_result_json from .manifests import ( CompletionMarkerPromotionResult, ReleaseManifestPromotionResult, @@ -28,6 +28,7 @@ "FullPromotionResult", "GcsPromotionResult", "HuggingFacePromotionResult", + "parse_full_promotion_result_json", "ReleaseManifestPromotionResult", "VersionManifestPromotionResult", ] diff --git a/policyengine_us_data/release_promotion/results/full.py b/policyengine_us_data/release_promotion/results/full.py index e4cd9ae02..8eee35f50 100644 --- a/policyengine_us_data/release_promotion/results/full.py +++ b/policyengine_us_data/release_promotion/results/full.py @@ -4,6 +4,7 @@ from collections.abc import Mapping from dataclasses import dataclass, field +import json from typing import Any from policyengine_us_data.pipeline_metadata import pipeline_node @@ -292,3 +293,15 @@ def from_legacy_dict(cls, data: Mapping[str, Any]) -> "FullPromotionResult": ), already_finalized=already_finalized, ) + + +def parse_full_promotion_result_json(payload: str) -> FullPromotionResult: + """Parse a JSON legacy promotion payload into a typed promotion result.""" + + try: + data = json.loads(payload) + except json.JSONDecodeError as exc: + raise ValueError("promotion result payload must be JSON") from exc + if not isinstance(data, Mapping): + raise ValueError("promotion result payload must be a JSON object") + return FullPromotionResult.from_legacy_dict(data) diff --git a/pyproject.toml b/pyproject.toml index e6c05b530..8f17d15d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "policyengine-us==1.703.1", + "policyengine-us==1.703.2", # policyengine-core 3.26.1 is the current 3.26.x runtime and includes the fix for # PolicyEngine/policyengine-core#482 (user-set ETERNITY inputs lost # after _invalidate_all_caches) and is required by policyengine-us 1.682.1+. diff --git a/tests/unit/release_promotion/test_contract.py b/tests/unit/release_promotion/test_contract.py new file mode 100644 index 000000000..334f3ba3d --- /dev/null +++ b/tests/unit/release_promotion/test_contract.py @@ -0,0 +1,276 @@ +import json + +import pytest + +from policyengine_us_data.release_promotion import ( + RELEASE_PROMOTION_CONTRACT_FILENAME, + RELEASE_PROMOTION_CONTRACT_TYPE, + FullPromotionResult, + ReleasePromotionContext, + build_legacy_release_candidate_bundle, + build_release_promotion_contract, + release_promotion_contract_path, + release_promotion_contract_repo_path, + write_release_promotion_contract, +) +from policyengine_us_data.stage_contracts import ( + ArtifactRef, + DiagnosticRef, + StageContract, + ValidationFinding, + ValidationReport, + contract_to_json, + read_contract, +) + + +def _context() -> ReleasePromotionContext: + return ReleasePromotionContext( + run_id="run-123", + candidate_version="1.73.0rc1", + release_version="1.73.0", + hf_repo_name="policyengine/policyengine-us-data", + gcs_bucket_name="policyengine-us-data", + base_release_version="1.72.0", + release_bump="minor", + modal_app_name="us-data-run-123", + modal_environment="main", + ) + + +def _candidate_bundle(): + return build_legacy_release_candidate_bundle( + context=_context(), + rel_paths=["states/AL.h5", "policy_data.db"], + artifact_metadata_by_path={ + "states/AL.h5": {"sha256": "sha256:state-al", "size_bytes": 12}, + "policy_data.db": {"sha256": "sha256:policy-db", "size_bytes": 24}, + }, + validation_report_paths=[ + "calibration/runs/run-123/diagnostics/validation_report.json" + ], + diagnostics_manifest_path="calibration/runs/run-123/diagnostics/manifest.json", + source_output_contract_path=( + "calibration/runs/run-123/diagnostics/contracts/output_build_contract.json" + ), + ) + + +def _legacy_promotion_payload(**overrides): + already_finalized = overrides.pop("already_finalized", False) + rel_paths = ("states/AL.h5", "policy_data.db") + payload = { + "run_id": "run-123", + "candidate_version": "1.73.0rc1", + "release_version": "1.73.0", + "rel_paths": rel_paths, + "artifact_count": 2, + "hf_repo_name": "policyengine/policyengine-us-data", + "hf_repo_type": "model", + "hf_staging_prefix": "staging/1.73.0rc1-run-123", + "hf_promoted": 0 if already_finalized else 2, + "hf_promoted_paths": rel_paths, + "hf_commit_id": None, + "hf_noop_paths": rel_paths if already_finalized else (), + "gcs_bucket_name": "policyengine-us-data", + "gcs_uploaded": 0 if already_finalized else 2, + "gcs_object_paths": rel_paths, + "gcs_skipped_paths": rel_paths if already_finalized else (), + "gcs_failures": (), + "release_manifest_path": "release_manifest.json", + "versioned_release_manifest_path": ("releases/1.73.0/release_manifest.json"), + "trace_tro_path": "trace.tro.jsonld", + "versioned_trace_tro_path": "releases/1.73.0/trace.tro.jsonld", + "release_manifest_sha256": None, + "release_manifest_artifacts": 2, + "version_manifest_path": "version_manifest.json", + "version_manifest_version": "1.73.0", + "version_manifest_current_version": "1.73.0", + "version_manifest_updated": not already_finalized, + "release_completion_marker": "releases/1.73.0/release-complete.json", + "release_completion_tag": "1.73.0", + "release_completion_valid": True, + "staging_cleaned": 3, + "staging_cleanup_attempted": True, + "staging_cleanup_status": "completed", + "already_finalized": already_finalized, + } + payload.update(overrides) + return payload + + +def _promotion_result(*, already_finalized: bool = False) -> FullPromotionResult: + return FullPromotionResult.from_legacy_dict( + _legacy_promotion_payload(already_finalized=already_finalized) + ) + + +def _validation_report() -> ValidationReport: + diagnostic = DiagnosticRef( + name="validation_report", + kind="json", + artifact=ArtifactRef( + logical_name="validation_report", + uri=( + "hf://policyengine/policyengine-us-data/calibration/runs/" + "run-123/diagnostics/validation_report.json" + ), + media_type="application/json", + ), + ) + return ValidationReport( + status="pass", + findings=( + ValidationFinding( + check_id="release_candidate_identity_declared", + status="pass", + message="candidate identity is declared", + ), + ), + diagnostics=(diagnostic,), + metadata={"stage_id": "5_validate_and_promote_release"}, + ) + + +def test_release_promotion_contract_records_candidate_and_public_refs() -> None: + contract = build_release_promotion_contract( + candidate_bundle=_candidate_bundle(), + promotion_result=_promotion_result(), + created_at="2026-05-18T12:00:00+00:00", + code_sha="abc123", + package_version="1.73.0", + validation=_validation_report(), + metadata={"writer": "test"}, + ) + + input_names = {artifact.logical_name for artifact in contract.inputs} + output_names = {artifact.logical_name for artifact in contract.outputs} + + assert contract.contract_type == RELEASE_PROMOTION_CONTRACT_TYPE + assert contract.stage_id == "5_validate_and_promote_release" + assert contract.run_id == "run-123" + assert "stage4_output_contract" in input_names + assert "validation_report_1" in input_names + assert "diagnostics_manifest" in input_names + assert output_names == { + "huggingface_release_artifacts", + "gcs_release_artifacts", + "release_manifest", + "versioned_release_manifest", + "trace_tro", + "versioned_trace_tro", + "version_manifest", + "release_completion_marker", + } + assert contract.execution.status == "completed" + assert contract.execution.reuse_decision == "computed" + assert contract.execution.reuse_summary.expected_outputs == 2 + assert contract.parameters["release_candidate_fingerprint"] + assert contract.parameters["source_output_contract_path"] == ( + "calibration/runs/run-123/diagnostics/contracts/output_build_contract.json" + ) + assert contract.metadata["contract_file"] == RELEASE_PROMOTION_CONTRACT_FILENAME + assert contract.metadata["already_finalized"] is False + assert contract.metadata["cleanup"]["cleaned_count"] == 3 + assert contract.metadata["public_refs"]["release_manifest"] == ( + "hf://policyengine/policyengine-us-data/release_manifest.json" + ) + assert [substage.substage_id for substage in contract.substages] == [ + "5a_validate_outputs", + "5b_promote_huggingface", + "5c_promote_gcs", + "5d_write_version_manifest", + ] + assert StageContract.from_dict(json.loads(contract_to_json(contract))) == contract + + +def test_release_promotion_contract_uses_typed_result_public_paths() -> None: + result = FullPromotionResult.from_legacy_dict( + _legacy_promotion_payload( + release_manifest_path="manifests/current_release.json", + versioned_release_manifest_path=( + "release-history/1.73.0/release_manifest.json" + ), + trace_tro_path="provenance/current_trace.jsonld", + versioned_trace_tro_path="release-history/1.73.0/trace.tro.jsonld", + version_manifest_path="registry/version_manifest.json", + release_completion_marker="release-history/1.73.0/complete.json", + release_manifest_sha256="sha256:manifest", + hf_commit_id="abc123", + ) + ) + + contract = build_release_promotion_contract( + candidate_bundle=_candidate_bundle(), + promotion_result=result, + created_at="2026-05-18T12:00:00+00:00", + ) + refs = {artifact.logical_name: artifact for artifact in contract.outputs} + + assert refs["release_manifest"].uri == ( + "hf://policyengine/policyengine-us-data/manifests/current_release.json" + ) + assert refs["versioned_release_manifest"].uri == ( + "hf://policyengine/policyengine-us-data/" + "release-history/1.73.0/release_manifest.json" + ) + assert refs["trace_tro"].uri == ( + "hf://policyengine/policyengine-us-data/provenance/current_trace.jsonld" + ) + assert refs["version_manifest"].uri == ( + "hf://policyengine/policyengine-us-data/registry/version_manifest.json" + ) + assert refs["release_completion_marker"].uri == ( + "hf://policyengine/policyengine-us-data/release-history/1.73.0/complete.json" + ) + assert refs["release_manifest"].sha256 == "sha256:manifest" + assert refs["huggingface_release_artifacts"].metadata["hf_commit"] == "abc123" + + +def test_release_promotion_contract_records_already_finalized_reuse() -> None: + contract = build_release_promotion_contract( + candidate_bundle=_candidate_bundle(), + promotion_result=_promotion_result(already_finalized=True), + created_at="2026-05-18T12:00:00+00:00", + ) + + assert contract.execution.reuse_decision == "reused" + assert contract.execution.reuse_reason == "already_finalized" + assert contract.execution.reuse_summary.valid_reused_outputs == 2 + assert contract.execution.reuse_summary.recomputed_outputs == 0 + + +def test_write_release_promotion_contract_writes_run_diagnostics_path(tmp_path) -> None: + contract_path = release_promotion_contract_path(tmp_path / "run-123") + + written = write_release_promotion_contract( + contract_path=contract_path, + candidate_bundle=_candidate_bundle(), + promotion_result=_promotion_result(), + created_at="2026-05-18T12:00:00+00:00", + ) + + assert contract_path == ( + tmp_path + / "run-123" + / "diagnostics" + / "contracts" + / "release_promotion_contract.json" + ) + assert read_contract(contract_path) == written + assert release_promotion_contract_repo_path("run-123") == ( + "calibration/runs/run-123/diagnostics/contracts/release_promotion_contract.json" + ) + + +def test_release_promotion_contract_rejects_mismatched_result_identity() -> None: + result = FullPromotionResult.from_legacy_dict( + _legacy_promotion_payload(run_id="other-run") + ) + + with pytest.raises(ValueError, match="run_id"): + build_release_promotion_contract( + candidate_bundle=_candidate_bundle(), + promotion_result=result, + created_at="2026-05-18T12:00:00+00:00", + ) diff --git a/tests/unit/release_promotion/test_results.py b/tests/unit/release_promotion/test_results.py index 52f62d4e2..1e43bb8e7 100644 --- a/tests/unit/release_promotion/test_results.py +++ b/tests/unit/release_promotion/test_results.py @@ -1,8 +1,12 @@ +import json from pathlib import Path import pytest -from policyengine_us_data.release_promotion import FullPromotionResult +from policyengine_us_data.release_promotion import ( + FullPromotionResult, + parse_full_promotion_result_json, +) from policyengine_us_data.utils.release_promotion import ( FullReleasePromotionConfig, FullReleasePromotionDependencies, @@ -220,6 +224,22 @@ def test_full_promotion_result_wraps_legacy_dict() -> None: assert FullPromotionResult.from_dict(result.to_dict()) == result +def test_parse_full_promotion_result_json_wraps_legacy_subprocess_payload() -> None: + result = parse_full_promotion_result_json( + json.dumps(_legacy_result_payload(artifact_count=2)) + ) + + assert result.run_id == "run-123" + assert result.hf.repo_name == "policyengine/policyengine-us-data" + assert result.gcs.bucket_name == "policyengine-us-data" + assert result.release_manifest.root_path == _RELEASE_MANIFEST_PATH + + +def test_parse_full_promotion_result_json_rejects_invalid_payload() -> None: + with pytest.raises(ValueError, match="must be JSON"): + parse_full_promotion_result_json("not-json") + + def test_promote_full_release_with_result_preserves_transaction_order(tmp_path) -> None: rel_paths = ("cps_2024.h5", "states/AL.h5", "national/US.h5") files = _make_files(tmp_path, rel_paths) diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 199d63747..c5137a741 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -19,7 +19,10 @@ _calibration_package_parameters, _new_run_metadata, _pipeline_error_summary, + _promotion_result_from_stdout, + _release_artifact_metadata_by_path, _run_required_promotion_subprocess, + _stage4_output_contract_repo_path_if_available, _traceback_text_for_pipeline_failure, _try_reload_pipeline_volume_after_h5_builds, ) @@ -183,6 +186,85 @@ def test_pipeline_failure_traceback_prefers_stage_1_command_tail(): assert "actual ecps failure" in traceback_text +def test_promotion_result_from_stdout_returns_typed_result(): + result = _promotion_result_from_stdout( + json.dumps( + { + "run_id": "run-123", + "candidate_version": "1.73.0rc1", + "release_version": "1.73.0", + "rel_paths": ("states/AL.h5",), + "artifact_count": 1, + "hf_repo_name": "policyengine/policyengine-us-data", + "hf_repo_type": "model", + "hf_staging_prefix": "staging/1.73.0rc1-run-123", + "hf_promoted": 1, + "hf_promoted_paths": ("states/AL.h5",), + "hf_commit_id": None, + "hf_noop_paths": (), + "gcs_bucket_name": "policyengine-us-data", + "gcs_uploaded": 1, + "gcs_object_paths": ("states/AL.h5",), + "gcs_skipped_paths": (), + "gcs_failures": (), + "release_manifest_path": "release_manifest.json", + "versioned_release_manifest_path": ( + "releases/1.73.0/release_manifest.json" + ), + "trace_tro_path": "trace.tro.jsonld", + "versioned_trace_tro_path": "releases/1.73.0/trace.tro.jsonld", + "release_manifest_sha256": None, + "release_manifest_artifacts": 1, + "version_manifest_path": "version_manifest.json", + "version_manifest_version": "1.73.0", + "version_manifest_current_version": "1.73.0", + "version_manifest_updated": True, + "release_completion_marker": ("releases/1.73.0/release-complete.json"), + "release_completion_tag": "1.73.0", + "release_completion_valid": True, + "staging_cleaned": 2, + "staging_cleanup_attempted": True, + "staging_cleanup_status": "completed", + } + ) + ) + + assert result.run_id == "run-123" + assert result.artifact_count == 1 + assert result.hf.promoted_count == 1 + + +def test_release_artifact_metadata_by_path_uses_local_files(tmp_path, monkeypatch): + artifact = tmp_path / "states" / "AL.h5" + artifact.parent.mkdir(parents=True) + artifact.write_text("state fixture", encoding="utf-8") + + monkeypatch.setattr( + "modal_app.pipeline._full_release_manifest_files", + lambda run_id, rel_paths: [(artifact, "states/AL.h5")], + ) + + metadata = _release_artifact_metadata_by_path("run-123", ["states/AL.h5"]) + + assert metadata["states/AL.h5"]["sha256"].startswith("sha256:") + assert metadata["states/AL.h5"]["size_bytes"] == artifact.stat().st_size + + +def test_stage4_output_contract_repo_path_detects_run_local_contract( + tmp_path, + monkeypatch, +): + run_dir = tmp_path / "run-123" + contract_path = run_dir / "diagnostics" / "contracts" / "output_build_contract.json" + contract_path.parent.mkdir(parents=True) + contract_path.write_text("{}", encoding="utf-8") + monkeypatch.setattr("modal_app.pipeline._run_dir", lambda run_id: run_dir) + + assert _stage4_output_contract_repo_path_if_available("run-123") == ( + "calibration/runs/run-123/diagnostics/contracts/output_build_contract.json" + ) + + def test_new_run_metadata_accepts_release_context_fields_once(): context = RunContext.from_mapping( { diff --git a/tests/unit/test_pipeline_source_contracts.py b/tests/unit/test_pipeline_source_contracts.py index 36820915b..1f964af08 100644 --- a/tests/unit/test_pipeline_source_contracts.py +++ b/tests/unit/test_pipeline_source_contracts.py @@ -29,6 +29,9 @@ def test_promote_run_uses_single_full_release_promotion() -> None: assert "_apply_run_context_env(promotion_context)" in source assert "_promote_full_release_from_staging(" in source assert "promotion_context.to_dict()" in source + assert "_promotion_result_from_stdout(promotion_stdout)" in source + assert "_write_release_promotion_contract_for_run(" in source + assert "release_promotion_contract_ref" in source assert "promote_publish.remote(" not in source assert "promote_national_publish.remote(" not in source assert "upload_datasets(" not in source @@ -122,6 +125,46 @@ def test_promote_run_uses_unified_staged_release_path() -> None: assert 'extra_cleanup_paths=["_run_context.json"]' in source +def test_promotion_stdout_parser_uses_stage5_result_parser() -> None: + tree = ast.parse(PIPELINE_SOURCE.read_text()) + helper = _function_def(tree, "_promotion_result_from_stdout") + source = ast.get_source_segment(PIPELINE_SOURCE.read_text(), helper) + + assert "parse_full_promotion_result_json" in source + assert "FullPromotionResult.from_legacy_dict" not in source + assert "json.loads" not in source + + +def test_promote_run_writes_release_promotion_contract_output() -> None: + tree = ast.parse(PIPELINE_SOURCE.read_text()) + helper = _function_def(tree, "_write_release_promotion_contract_for_run") + stage4_helper = _function_def( + tree, + "_stage4_output_contract_repo_path_if_available", + ) + helper_source = ast.get_source_segment(PIPELINE_SOURCE.read_text(), helper) + stage4_source = ast.get_source_segment(PIPELINE_SOURCE.read_text(), stage4_helper) + + assert "release_promotion_contract_path(run_dir)" in helper_source + assert "build_legacy_release_candidate_bundle(" in helper_source + assert "write_release_promotion_contract(" in helper_source + assert 'role="contract"' in helper_source + assert 'media_type="application/json"' in helper_source + assert "validation_report_paths=_run_validation_report_repo_paths_if_available" in ( + helper_source + ) + assert ( + "diagnostics_manifest_path=_run_diagnostics_manifest_repo_path_if_available" + in helper_source + ) + assert ( + "source_output_contract_path=_stage4_output_contract_repo_path_if_available" + in (helper_source) + ) + assert 'diagnostics" / "contracts" / "output_build_contract.json"' in stage4_source + assert "calibration/runs/{run_id}/" in stage4_source + + def test_run_pipeline_refreshes_diagnostics_even_when_h5_outputs_reused() -> None: tree = ast.parse(PIPELINE_SOURCE.read_text()) run_pipeline = _function_def(tree, "run_pipeline") diff --git a/uv.lock b/uv.lock index f96ba5ab0..1c081b6bf 100644 --- a/uv.lock +++ b/uv.lock @@ -2122,7 +2122,7 @@ wheels = [ [[package]] name = "policyengine-us" -version = "1.703.1" +version = "1.703.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "microdf-python" }, @@ -2132,9 +2132,9 @@ dependencies = [ { name = "tables" }, { name = "tqdm" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/2f/d7/6268c87ecb05e3aa1edaee9dc79467da8c96c69dc5b6139754bbf9e1970d/policyengine_us-1.703.1.tar.gz", hash = "sha256:951cf922550849890a73442282cc1e013852b270c3b3b4e24aca5ae29e6e811d", size = 9886740, upload-time = "2026-05-21T22:17:47.309Z" } +sdist = { url = "https://files.pythonhosted.org/packages/1f/ac/911ba9f1bfd20b990078b5a83e23b837a178ce1de370d2928929f068c0d3/policyengine_us-1.703.2.tar.gz", hash = "sha256:88fa00e78f54acefb80e2504f8ce4ae2e8c6b9eb5d5ad1a93d32f03bc12b7a14", size = 9888715, upload-time = "2026-05-22T14:25:54.372Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/8f/91/dc40a435fb0af3cdf62fa476b87674a2fb4cfd221137f2c5a98ce194d96a/policyengine_us-1.703.1-py3-none-any.whl", hash = "sha256:39445e07e7616d5c4da006a0836cf8c2b326f6f6dec1c8b633bb835cf8682f35", size = 10680928, upload-time = "2026-05-21T22:17:43.642Z" }, + { url = "https://files.pythonhosted.org/packages/39/9c/d9418fb767786574abed014dd6cf717ff56795f23e64fb2ecf9bdccee852/policyengine_us-1.703.2-py3-none-any.whl", hash = "sha256:317be03c6be00bb4de5a2a422b64e15638429ccdbdf5f16c87b00b43ff655c30", size = 10683478, upload-time = "2026-05-22T14:25:50.977Z" }, ] [[package]] @@ -2204,7 +2204,7 @@ requires-dist = [ { name = "pandas", specifier = ">=2.3.1" }, { name = "pip-system-certs", specifier = ">=3.0" }, { name = "policyengine-core", specifier = ">=3.26.1,<3.27" }, - { name = "policyengine-us", specifier = "==1.703.1" }, + { name = "policyengine-us", specifier = "==1.703.2" }, { name = "requests", specifier = ">=2.25.0" }, { name = "samplics", marker = "extra == 'calibration'" }, { name = "scipy", specifier = ">=1.15.3" },