diff --git a/changelog.d/1026.added b/changelog.d/1026.added new file mode 100644 index 000000000..8f15a5f66 --- /dev/null +++ b/changelog.d/1026.added @@ -0,0 +1 @@ +Added typed Stage 5 promotion result models around the existing release transaction engine. diff --git a/docs/engineering/pipeline-map.md b/docs/engineering/pipeline-map.md index c7dd50336..19222b6a4 100644 --- a/docs/engineering/pipeline-map.md +++ b/docs/engineering/pipeline-map.md @@ -882,6 +882,14 @@ def fitted_weights_spec_for_scope(scope: FitScope | str) -> FittedWeightsSpec Return the current fitted-weight spec for a regional or national scope. +### `policyengine_us_data.release_promotion.results.full.FullPromotionResult` + +```python +class FullPromotionResult +``` + +Typed result for a full Stage 5 release promotion transaction. + ### `policyengine_us_data.datasets.cps.extended_cps.ExtendedCPS._validate_housing_assistance_microsimulation` ```python @@ -1450,6 +1458,14 @@ def stage_1_step_specs() -> tuple[DatasetBuildStepSpec, ...] Return the canonical Stage 1 dataset-build substage specs. +### `policyengine_us_data.utils.release_promotion.promote_full_release_with_result` + +```python +def promote_full_release_with_result(config: FullReleasePromotionConfig, deps: FullReleasePromotionDependencies) -> 'FullPromotionResult' +``` + +Run the existing transaction engine and wrap its output in a typed result. + ### `policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder` ```python diff --git a/docs/engineering/skills/documentation_review.md b/docs/engineering/skills/documentation_review.md index fd59a11b7..34c6a135d 100644 --- a/docs/engineering/skills/documentation_review.md +++ b/docs/engineering/skills/documentation_review.md @@ -47,6 +47,10 @@ Check that changed pipeline behavior has a durable documentation surface: - Generated docs build when decorator, Pydoc, or map source changes. PRs that change decorator metadata, Pydoc-facing source, or `docs/pipeline_map.yaml` should refresh the checked-in generated artifacts in the same change. +- Pipeline documentation segment edits require the same treatment: if a PR + changes source text that feeds generated pipeline docs or AI-facing pipeline + guidance, verify whether `scripts/extract_pipeline_docs.py` output changes and + commit the refreshed generated artifacts when it does. - Stale architecture names, folder names, and artifact names are not preserved in durable documentation sources or generated output. diff --git a/docs/engineering/skills/pipeline_docs.md b/docs/engineering/skills/pipeline_docs.md index 9936d62de..e5bccc829 100644 --- a/docs/engineering/skills/pipeline_docs.md +++ b/docs/engineering/skills/pipeline_docs.md @@ -19,7 +19,14 @@ The generated JSON and Markdown files are published artifacts, not hand-authored source. PRs should update decorators, docstrings, and `docs/pipeline_map.yaml`, then regenerate the checked-in artifacts in the same change so reviewers see the pipeline docs that will ship. On pushes to `main`, automation may refresh those -artifacts again with the version/changelog commit. +artifacts again with the version/changelog commit, but PR authors and AI agents +must not rely on that later automation for review correctness. + +Any time a PR touches a pipeline documentation segment, a `@pipeline_node` +decorator, Pydoc-facing text that feeds the extractor, or +`docs/pipeline_map.yaml`, regenerate and commit the checked-in docs produced by +`scripts/extract_pipeline_docs.py`. Treat the generated docs as part of the same +change, even if the source edit is small. ## Annotation Rules diff --git a/docs/engineering/stages/release_promotion.md b/docs/engineering/stages/release_promotion.md index db5fc73c7..9c9778ac5 100644 --- a/docs/engineering/stages/release_promotion.md +++ b/docs/engineering/stages/release_promotion.md @@ -103,3 +103,21 @@ perform Hugging Face writes, GCS uploads, Modal calls, staging cleanup, or release-manifest publication. Keep those operations behind explicit adapters or services so tests can exercise candidate shape and validation logic without credentials or network access. + +Use `FullPromotionResult` and its substep result objects when exposing Stage 5 +promotion outcomes to contracts, status APIs, or orchestration summaries. The +current compatibility wrapper, `promote_full_release_with_result()`, must keep +calling the existing transaction engine first and only wrap its dictionary +output afterward so the promotion order remains unchanged. + +Result objects should carry semantic public-output identity, not only counts. +Keep Hugging Face repo/type, staging prefix, promoted/no-op paths, and commit ID +when available on `HuggingFacePromotionResult`; GCS bucket, release version, +object paths, skipped paths, and failures on `GcsPromotionResult`; release +manifest and TRACE TRO paths on `ReleaseManifestPromotionResult`; version +manifest path/version/current version on `VersionManifestPromotionResult`; +completion marker path/tag/validity on `CompletionMarkerPromotionResult`; and +cleanup `status` as `skipped`, `completed`, or `failed` on +`CleanupPromotionResult`. Later contract, index, diagnostics, and status +writers should read this typed material instead of scraping logs or +reconstructing public paths independently. diff --git a/docs/generated/pipeline_api.json b/docs/generated/pipeline_api.json index 5720f1509..f06355d9b 100644 --- a/docs/generated/pipeline_api.json +++ b/docs/generated/pipeline_api.json @@ -1228,6 +1228,40 @@ "signature": "def fitted_weights_spec_for_scope(scope: FitScope | str) -> FittedWeightsSpec", "source_file": "policyengine_us_data/fit_weights/specs.py" }, + "full_promotion_result": { + "docstring": "Typed result for a full Stage 5 release promotion transaction.", + "id": "full_promotion_result", + "kind": "class", + "line": 62, + "metadata": { + "api_refs": [ + "policyengine_us_data.release_promotion.results.FullPromotionResult", + "policyengine_us_data.release_promotion.results.full.FullPromotionResult" + ], + "artifacts_in": [ + "release promotion transaction output" + ], + "artifacts_out": [ + "typed promotion result" + ], + "description": "Typed Stage 5 result model for full release promotion outcomes.", + "id": "full_promotion_result", + "label": "FullPromotionResult", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/release_promotion/results/full.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_results.py" + ] + }, + "object_path": "policyengine_us_data.release_promotion.results.full.FullPromotionResult", + "signature": "class FullPromotionResult", + "source_file": "policyengine_us_data/release_promotion/results/full.py" + }, "geo_assign": { "docstring": "Assign random census block geography to cloned\nCPS records.\n\nEach of n_records * n_clones total records gets a\nrandom census block sampled from the global\npopulation-weighted distribution. State and CD are\nderived from the block GEOID.\n\nArgs:\n n_records: Number of households in the base CPS\n dataset.\n n_clones: Number of clones (default 10).\n seed: Random seed for reproducibility.\n fixed_state_fips: Optional state FIPS per base record. Positive\n values constrain every clone of that record to blocks in the\n requested state; zero or missing values remain unrestricted.\n\nReturns:\n GeographyAssignment with arrays of length\n n_records * n_clones.", "id": "geo_assign", @@ -3889,6 +3923,40 @@ "signature": "def validate_area(sim, targets_df: pd.DataFrame, engine, area_type: str, area_id: str, display_id: str, dataset_path: str, period: int, training_mask: np.ndarray, variable_entity_map: dict, constraints_map: Optional[dict] = None) -> list", "source_file": "policyengine_us_data/calibration/validate_staging.py" }, + "typed_full_release_promotion": { + "docstring": "Run the existing transaction engine and wrap its output in a typed result.", + "id": "typed_full_release_promotion", + "kind": "function", + "line": 171, + "metadata": { + "api_refs": [ + "policyengine_us_data.utils.release_promotion.promote_full_release_with_result" + ], + "artifacts_in": [ + "staged release artifacts", + "release manifest inputs" + ], + "artifacts_out": [ + "FullPromotionResult" + ], + "description": "Compatibility wrapper that returns typed Stage 5 promotion results from the existing transaction engine.", + "id": "typed_full_release_promotion", + "label": "Typed Full Release Promotion", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/utils/release_promotion.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_results.py" + ] + }, + "object_path": "policyengine_us_data.utils.release_promotion.promote_full_release_with_result", + "signature": "def promote_full_release_with_result(config: FullReleasePromotionConfig, deps: FullReleasePromotionDependencies) -> 'FullPromotionResult'", + "source_file": "policyengine_us_data/utils/release_promotion.py" + }, "unified_matrix_builder": { "docstring": "Build sparse calibration matrix for cloned CPS records.\n\nProcesses clone-by-clone: each clone's records get their\nassigned geography, are simulated, and the results fill\nthe corresponding columns.\n\nArgs:\n db_uri: SQLAlchemy database URI.\n time_period: Tax year for calibration (e.g. 2024).\n dataset_path: Path to the base extended CPS h5 file.", "id": "unified_matrix_builder", diff --git a/docs/generated/pipeline_map.json b/docs/generated/pipeline_map.json index 33bb47901..6b1d74980 100644 --- a/docs/generated/pipeline_map.json +++ b/docs/generated/pipeline_map.json @@ -344,6 +344,31 @@ "uv run pytest tests/unit/fit_weights/test_specs.py" ] }, + { + "api_refs": [ + "policyengine_us_data.release_promotion.results.FullPromotionResult", + "policyengine_us_data.release_promotion.results.full.FullPromotionResult" + ], + "artifacts_in": [ + "release promotion transaction output" + ], + "artifacts_out": [ + "typed promotion result" + ], + "description": "Typed Stage 5 result model for full release promotion outcomes.", + "id": "full_promotion_result", + "label": "FullPromotionResult", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/release_promotion/results/full.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_results.py" + ] + }, { "api_refs": [ "policyengine_us_data.datasets.cps.extended_cps.ExtendedCPS._validate_housing_assistance_microsimulation" @@ -1741,6 +1766,31 @@ "uv run pytest tests/unit/test_build_dataset_specs.py" ] }, + { + "api_refs": [ + "policyengine_us_data.utils.release_promotion.promote_full_release_with_result" + ], + "artifacts_in": [ + "staged release artifacts", + "release manifest inputs" + ], + "artifacts_out": [ + "FullPromotionResult" + ], + "description": "Compatibility wrapper that returns typed Stage 5 promotion results from the existing transaction engine.", + "id": "typed_full_release_promotion", + "label": "Typed Full Release Promotion", + "node_type": "library", + "pathways": [ + "5_validate_and_promote_release" + ], + "source_file": "policyengine_us_data/utils/release_promotion.py", + "stability": "moving", + "status": "transitional", + "validation_commands": [ + "uv run pytest tests/unit/release_promotion/test_results.py" + ] + }, { "api_refs": [ "policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder" @@ -1966,9 +2016,9 @@ } ], "metadata": { - "api_node_count": 95, + "api_node_count": 97, "canonical_stage_count": 5, - "decorated_object_count": 144, + "decorated_object_count": 146, "mapped_decorated_node_count": 49, "stage_count": 17, "substage_count": 17 diff --git a/policyengine_us_data/release_promotion/__init__.py b/policyengine_us_data/release_promotion/__init__.py index c01b44134..ee64c5e5d 100644 --- a/policyengine_us_data/release_promotion/__init__.py +++ b/policyengine_us_data/release_promotion/__init__.py @@ -24,6 +24,15 @@ build_release_candidate_bundle_from_stage4_contract, read_stage4_release_candidate_bundle, ) +from .results import ( + CleanupPromotionResult, + CompletionMarkerPromotionResult, + FullPromotionResult, + GcsPromotionResult, + HuggingFacePromotionResult, + ReleaseManifestPromotionResult, + VersionManifestPromotionResult, +) from .validation import build_release_candidate_shape_report from .validation import ( DEFAULT_REQUIRED_RELEASE_ARTIFACT_FAMILIES, @@ -39,11 +48,18 @@ "BASE_RELEASE_ARTIFACT_PATHS", "DEFAULT_REQUIRED_RELEASE_ARTIFACT_FAMILIES", "RELEASE_VALIDATION_SUBSTAGE_ID", + "CleanupPromotionResult", + "CompletionMarkerPromotionResult", + "FullPromotionResult", + "GcsPromotionResult", + "HuggingFacePromotionResult", "ReleaseArtifactSpec", "ReleaseCandidateInputBundle", "ReleasePromotionContext", "ReleaseCandidateValidationDependencies", "ReleaseCandidateValidator", + "ReleaseManifestPromotionResult", + "VersionManifestPromotionResult", "VALIDATION_REPORT_POLICY_PRESENCE_ONLY", "VALIDATION_REPORT_POLICY_REQUIRE_PASSING", "build_legacy_release_candidate_bundle", diff --git a/policyengine_us_data/release_promotion/results/__init__.py b/policyengine_us_data/release_promotion/results/__init__.py new file mode 100644 index 000000000..761d14f55 --- /dev/null +++ b/policyengine_us_data/release_promotion/results/__init__.py @@ -0,0 +1,33 @@ +"""Typed Stage 5 promotion result models.""" + +from .cleanup import ( + CLEANUP_STATUS_COMPLETED, + CLEANUP_STATUS_FAILED, + CLEANUP_STATUS_SKIPPED, + CLEANUP_STATUSES, + CleanupPromotionResult, +) +from .destinations import ( + GcsPromotionResult, + HuggingFacePromotionResult, +) +from .full import FullPromotionResult +from .manifests import ( + CompletionMarkerPromotionResult, + ReleaseManifestPromotionResult, + VersionManifestPromotionResult, +) + +__all__ = [ + "CLEANUP_STATUS_COMPLETED", + "CLEANUP_STATUS_FAILED", + "CLEANUP_STATUS_SKIPPED", + "CLEANUP_STATUSES", + "CleanupPromotionResult", + "CompletionMarkerPromotionResult", + "FullPromotionResult", + "GcsPromotionResult", + "HuggingFacePromotionResult", + "ReleaseManifestPromotionResult", + "VersionManifestPromotionResult", +] diff --git a/policyengine_us_data/release_promotion/results/_coercion.py b/policyengine_us_data/release_promotion/results/_coercion.py new file mode 100644 index 000000000..0b0d84801 --- /dev/null +++ b/policyengine_us_data/release_promotion/results/_coercion.py @@ -0,0 +1,46 @@ +"""Shared coercion helpers for typed release-promotion results.""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any + +from policyengine_us_data.stage_contracts._coercion import require_non_empty + + +def nonnegative_int(value: Any, field_name: str) -> int: + """Return a non-negative integer or raise a contract validation error.""" + + if isinstance(value, bool) or not isinstance(value, int): + raise ValueError(f"{field_name} must be an integer") + if value < 0: + raise ValueError(f"{field_name} must be non-negative") + return value + + +def bool_value(value: Any, field_name: str) -> bool: + """Return a boolean or raise a contract validation error.""" + + if not isinstance(value, bool): + raise ValueError(f"{field_name} must be a boolean") + return value + + +def string_tuple(value: Any, field_name: str) -> tuple[str, ...]: + """Return a tuple of non-empty strings from a sequence value.""" + + if value is None: + return () + if isinstance(value, str) or not isinstance(value, Sequence): + raise ValueError(f"{field_name} must be a sequence of strings") + items = tuple(value) + for item in items: + require_non_empty(item, f"{field_name} item") + return items + + +def require_type(value: Any, field_name: str, expected_type: type) -> None: + """Validate a nested typed result object.""" + + if not isinstance(value, expected_type): + raise ValueError(f"{field_name} must be {expected_type.__name__}") diff --git a/policyengine_us_data/release_promotion/results/cleanup.py b/policyengine_us_data/release_promotion/results/cleanup.py new file mode 100644 index 000000000..1141c5414 --- /dev/null +++ b/policyengine_us_data/release_promotion/results/cleanup.py @@ -0,0 +1,85 @@ +"""Typed cleanup result models for release promotion.""" + +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass +from typing import Any + +from policyengine_us_data.release_promotion.results._coercion import ( + bool_value, + nonnegative_int, +) +from policyengine_us_data.stage_contracts._coercion import ( + schema_version, + validate_schema_version, +) +from policyengine_us_data.stage_contracts.constants import CONTRACT_SCHEMA_VERSION + +CLEANUP_STATUS_SKIPPED = "skipped" +CLEANUP_STATUS_COMPLETED = "completed" +CLEANUP_STATUS_FAILED = "failed" +CLEANUP_STATUSES = frozenset( + { + CLEANUP_STATUS_SKIPPED, + CLEANUP_STATUS_COMPLETED, + CLEANUP_STATUS_FAILED, + } +) + + +def cleanup_status(value: Any) -> str: + """Return a known cleanup status value.""" + + if value not in CLEANUP_STATUSES: + allowed = ", ".join(sorted(CLEANUP_STATUSES)) + raise ValueError(f"cleanup status must be one of: {allowed}") + return str(value) + + +@dataclass(frozen=True, kw_only=True) +class CleanupPromotionResult: + """Result for post-certification staging cleanup.""" + + cleaned_count: int + attempted: bool = True + status: str = CLEANUP_STATUS_COMPLETED + schema_version: str = CONTRACT_SCHEMA_VERSION + + def __post_init__(self) -> None: + validate_schema_version(self.schema_version, self.__class__.__name__) + object.__setattr__( + self, + "cleaned_count", + nonnegative_int(self.cleaned_count, "cleaned_count"), + ) + object.__setattr__(self, "attempted", bool_value(self.attempted, "attempted")) + object.__setattr__(self, "status", cleanup_status(self.status)) + if not self.attempted and self.status != CLEANUP_STATUS_SKIPPED: + raise ValueError("cleanup status must be skipped when attempted is false") + + def to_dict(self) -> dict[str, Any]: + """Serialize this result to JSON-compatible primitives.""" + + return { + "cleaned_count": self.cleaned_count, + "attempted": self.attempted, + "status": self.status, + "schema_version": self.schema_version, + } + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "CleanupPromotionResult": + """Restore a cleanup result from a mapping.""" + + return cls( + cleaned_count=nonnegative_int( + data.get("cleaned_count"), + "cleaned_count", + ), + attempted=bool_value(data.get("attempted", True), "attempted"), + status=cleanup_status( + data.get("status", CLEANUP_STATUS_COMPLETED), + ), + schema_version=schema_version(data), + ) diff --git a/policyengine_us_data/release_promotion/results/destinations.py b/policyengine_us_data/release_promotion/results/destinations.py new file mode 100644 index 000000000..57f0f7ca7 --- /dev/null +++ b/policyengine_us_data/release_promotion/results/destinations.py @@ -0,0 +1,189 @@ +"""Typed destination result models for release promotion.""" + +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass +from typing import Any + +from policyengine_us_data.release_promotion.results._coercion import ( + bool_value, + nonnegative_int, + string_tuple, +) +from policyengine_us_data.stage_contracts._coercion import ( + optional_string, + optional_string_value, + require_non_empty, + required_string, + schema_version, + validate_schema_version, +) +from policyengine_us_data.stage_contracts.constants import CONTRACT_SCHEMA_VERSION + + +@dataclass(frozen=True, kw_only=True) +class HuggingFacePromotionResult: + """Result for copying staged Hugging Face artifacts to production paths.""" + + repo_name: str + repo_type: str + source_staging_prefix: str + promoted_paths: tuple[str, ...] + promoted_count: int + commit_id: str | None = None + noop_paths: tuple[str, ...] = () + already_finalized: bool = False + schema_version: str = CONTRACT_SCHEMA_VERSION + + def __post_init__(self) -> None: + validate_schema_version(self.schema_version, self.__class__.__name__) + object.__setattr__( + self, "repo_name", require_non_empty(self.repo_name, "repo_name") + ) + object.__setattr__( + self, "repo_type", require_non_empty(self.repo_type, "repo_type") + ) + object.__setattr__( + self, + "source_staging_prefix", + require_non_empty(self.source_staging_prefix, "source_staging_prefix"), + ) + object.__setattr__( + self, + "promoted_paths", + string_tuple(self.promoted_paths, "promoted_paths"), + ) + object.__setattr__( + self, + "promoted_count", + nonnegative_int(self.promoted_count, "promoted_count"), + ) + object.__setattr__( + self, + "commit_id", + optional_string_value(self.commit_id, "commit_id"), + ) + object.__setattr__( + self, + "noop_paths", + string_tuple(self.noop_paths, "noop_paths"), + ) + object.__setattr__( + self, + "already_finalized", + bool_value(self.already_finalized, "already_finalized"), + ) + + def to_dict(self) -> dict[str, Any]: + """Serialize this result to JSON-compatible primitives.""" + + return { + "repo_name": self.repo_name, + "repo_type": self.repo_type, + "source_staging_prefix": self.source_staging_prefix, + "promoted_paths": list(self.promoted_paths), + "promoted_count": self.promoted_count, + "commit_id": self.commit_id, + "noop_paths": list(self.noop_paths), + "already_finalized": self.already_finalized, + "schema_version": self.schema_version, + } + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "HuggingFacePromotionResult": + """Restore a Hugging Face promotion result from a mapping.""" + + return cls( + repo_name=required_string(data, "repo_name"), + repo_type=required_string(data, "repo_type"), + source_staging_prefix=required_string(data, "source_staging_prefix"), + promoted_paths=string_tuple(data.get("promoted_paths"), "promoted_paths"), + promoted_count=nonnegative_int( + data.get("promoted_count"), + "promoted_count", + ), + commit_id=optional_string(data, "commit_id"), + noop_paths=string_tuple(data.get("noop_paths"), "noop_paths"), + already_finalized=bool_value( + data.get("already_finalized", False), + "already_finalized", + ), + schema_version=schema_version(data), + ) + + +@dataclass(frozen=True, kw_only=True) +class GcsPromotionResult: + """Result for uploading staged Hugging Face artifacts to GCS.""" + + bucket_name: str + object_paths: tuple[str, ...] + release_version: str + uploaded_count: int + skipped_paths: tuple[str, ...] = () + failures: tuple[str, ...] = () + schema_version: str = CONTRACT_SCHEMA_VERSION + + def __post_init__(self) -> None: + validate_schema_version(self.schema_version, self.__class__.__name__) + object.__setattr__( + self, + "bucket_name", + require_non_empty(self.bucket_name, "bucket_name"), + ) + object.__setattr__( + self, + "object_paths", + string_tuple(self.object_paths, "object_paths"), + ) + object.__setattr__( + self, + "release_version", + require_non_empty(self.release_version, "release_version"), + ) + object.__setattr__( + self, + "uploaded_count", + nonnegative_int(self.uploaded_count, "uploaded_count"), + ) + object.__setattr__( + self, + "skipped_paths", + string_tuple(self.skipped_paths, "skipped_paths"), + ) + object.__setattr__( + self, + "failures", + string_tuple(self.failures, "failures"), + ) + + def to_dict(self) -> dict[str, Any]: + """Serialize this result to JSON-compatible primitives.""" + + return { + "bucket_name": self.bucket_name, + "object_paths": list(self.object_paths), + "release_version": self.release_version, + "uploaded_count": self.uploaded_count, + "skipped_paths": list(self.skipped_paths), + "failures": list(self.failures), + "schema_version": self.schema_version, + } + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "GcsPromotionResult": + """Restore a GCS promotion result from a mapping.""" + + return cls( + bucket_name=required_string(data, "bucket_name"), + object_paths=string_tuple(data.get("object_paths"), "object_paths"), + release_version=required_string(data, "release_version"), + uploaded_count=nonnegative_int( + data.get("uploaded_count"), + "uploaded_count", + ), + skipped_paths=string_tuple(data.get("skipped_paths"), "skipped_paths"), + failures=string_tuple(data.get("failures"), "failures"), + schema_version=schema_version(data), + ) diff --git a/policyengine_us_data/release_promotion/results/full.py b/policyengine_us_data/release_promotion/results/full.py new file mode 100644 index 000000000..e4cd9ae02 --- /dev/null +++ b/policyengine_us_data/release_promotion/results/full.py @@ -0,0 +1,294 @@ +"""Typed full release-promotion result aggregate.""" + +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass, field +from typing import Any + +from policyengine_us_data.pipeline_metadata import pipeline_node +from policyengine_us_data.release_promotion.results._coercion import ( + bool_value, + nonnegative_int, + require_type, + string_tuple, +) +from policyengine_us_data.release_promotion.results.cleanup import ( + CLEANUP_STATUS_COMPLETED, + CLEANUP_STATUS_SKIPPED, + CleanupPromotionResult, + cleanup_status, +) +from policyengine_us_data.release_promotion.results.destinations import ( + GcsPromotionResult, + HuggingFacePromotionResult, +) +from policyengine_us_data.release_promotion.results.manifests import ( + CompletionMarkerPromotionResult, + ReleaseManifestPromotionResult, + VersionManifestPromotionResult, +) +from policyengine_us_data.stage_contracts._coercion import ( + freeze_mapping, + jsonable_value, + mapping_value, + optional_string, + require_non_empty, + required_string, + schema_version, + validate_schema_version, +) +from policyengine_us_data.stage_contracts.constants import CONTRACT_SCHEMA_VERSION + + +def _default_hf_staging_prefix(candidate_version: str, run_id: str) -> str: + return f"staging/{candidate_version}-{run_id}" + + +@pipeline_node( + id="full_promotion_result", + label="FullPromotionResult", + node_type="library", + description="Typed Stage 5 result model for full release promotion outcomes.", + status="transitional", + stability="moving", + pathways=["5_validate_and_promote_release"], + api_refs=["policyengine_us_data.release_promotion.results.FullPromotionResult"], + artifacts_in=["release promotion transaction output"], + artifacts_out=["typed promotion result"], + validation_commands=["uv run pytest tests/unit/release_promotion/test_results.py"], +) +@dataclass(frozen=True, kw_only=True) +class FullPromotionResult: + """Typed result for a full Stage 5 release promotion transaction.""" + + run_id: str + candidate_version: str + release_version: str + artifact_count: int + hf: HuggingFacePromotionResult + gcs: GcsPromotionResult + release_manifest: ReleaseManifestPromotionResult + version_manifest: VersionManifestPromotionResult + completion_marker: CompletionMarkerPromotionResult + cleanup: CleanupPromotionResult + already_finalized: bool = False + metadata: Mapping[str, Any] = field(default_factory=dict) + schema_version: str = CONTRACT_SCHEMA_VERSION + + def __post_init__(self) -> None: + validate_schema_version(self.schema_version, self.__class__.__name__) + object.__setattr__(self, "run_id", require_non_empty(self.run_id, "run_id")) + object.__setattr__( + self, + "candidate_version", + require_non_empty(self.candidate_version, "candidate_version"), + ) + object.__setattr__( + self, + "release_version", + require_non_empty(self.release_version, "release_version"), + ) + object.__setattr__( + self, + "artifact_count", + nonnegative_int(self.artifact_count, "artifact_count"), + ) + require_type(self.hf, "hf", HuggingFacePromotionResult) + require_type(self.gcs, "gcs", GcsPromotionResult) + require_type( + self.release_manifest, + "release_manifest", + ReleaseManifestPromotionResult, + ) + require_type( + self.version_manifest, + "version_manifest", + VersionManifestPromotionResult, + ) + require_type( + self.completion_marker, + "completion_marker", + CompletionMarkerPromotionResult, + ) + require_type(self.cleanup, "cleanup", CleanupPromotionResult) + object.__setattr__( + self, + "already_finalized", + bool_value(self.already_finalized, "already_finalized"), + ) + object.__setattr__( + self, + "metadata", + freeze_mapping(self.metadata, "metadata"), + ) + + def to_dict(self) -> dict[str, Any]: + """Serialize this result to JSON-compatible primitives.""" + + return { + "run_id": self.run_id, + "candidate_version": self.candidate_version, + "release_version": self.release_version, + "artifact_count": self.artifact_count, + "hf": self.hf.to_dict(), + "gcs": self.gcs.to_dict(), + "release_manifest": self.release_manifest.to_dict(), + "version_manifest": self.version_manifest.to_dict(), + "completion_marker": self.completion_marker.to_dict(), + "cleanup": self.cleanup.to_dict(), + "already_finalized": self.already_finalized, + "metadata": jsonable_value(self.metadata), + "schema_version": self.schema_version, + } + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "FullPromotionResult": + """Restore a full promotion result from a mapping.""" + + return cls( + run_id=required_string(data, "run_id"), + candidate_version=required_string(data, "candidate_version"), + release_version=required_string(data, "release_version"), + artifact_count=nonnegative_int( + data.get("artifact_count"), + "artifact_count", + ), + hf=HuggingFacePromotionResult.from_dict(mapping_value(data, "hf")), + gcs=GcsPromotionResult.from_dict(mapping_value(data, "gcs")), + release_manifest=ReleaseManifestPromotionResult.from_dict( + mapping_value(data, "release_manifest"), + ), + version_manifest=VersionManifestPromotionResult.from_dict( + mapping_value(data, "version_manifest"), + ), + completion_marker=CompletionMarkerPromotionResult.from_dict( + mapping_value(data, "completion_marker"), + ), + cleanup=CleanupPromotionResult.from_dict(mapping_value(data, "cleanup")), + already_finalized=bool_value( + data.get("already_finalized", False), + "already_finalized", + ), + metadata=mapping_value(data, "metadata"), + schema_version=schema_version(data), + ) + + @classmethod + def from_legacy_dict(cls, data: Mapping[str, Any]) -> "FullPromotionResult": + """Build a typed result from the existing promotion dictionary output.""" + + run_id = required_string(data, "run_id") + candidate_version = required_string(data, "candidate_version") + release_version = required_string(data, "release_version") + rel_paths = string_tuple(data.get("rel_paths"), "rel_paths") + already_finalized = bool_value( + data.get("already_finalized", False), + "already_finalized", + ) + cleanup_attempted = bool_value( + data.get("staging_cleanup_attempted", True), + "staging_cleanup_attempted", + ) + cleanup_status_value = data.get("staging_cleanup_status") + if cleanup_status_value is None: + cleanup_status_value = ( + CLEANUP_STATUS_SKIPPED + if not cleanup_attempted + else CLEANUP_STATUS_COMPLETED + ) + + return cls( + run_id=run_id, + candidate_version=candidate_version, + release_version=release_version, + artifact_count=nonnegative_int( + data.get("artifact_count"), + "artifact_count", + ), + hf=HuggingFacePromotionResult( + repo_name=required_string(data, "hf_repo_name"), + repo_type=required_string(data, "hf_repo_type"), + source_staging_prefix=( + optional_string(data, "hf_staging_prefix") + or _default_hf_staging_prefix(candidate_version, run_id) + ), + promoted_paths=string_tuple( + data.get("hf_promoted_paths", rel_paths), + "hf_promoted_paths", + ), + promoted_count=nonnegative_int( + data.get("hf_promoted"), + "hf_promoted", + ), + commit_id=optional_string(data, "hf_commit_id"), + noop_paths=string_tuple( + data.get("hf_noop_paths", rel_paths if already_finalized else ()), + "hf_noop_paths", + ), + already_finalized=already_finalized, + ), + gcs=GcsPromotionResult( + bucket_name=required_string(data, "gcs_bucket_name"), + object_paths=string_tuple( + data.get("gcs_object_paths", rel_paths), + "gcs_object_paths", + ), + release_version=release_version, + uploaded_count=nonnegative_int( + data.get("gcs_uploaded"), + "gcs_uploaded", + ), + skipped_paths=string_tuple( + data.get( + "gcs_skipped_paths", + rel_paths if already_finalized else (), + ), + "gcs_skipped_paths", + ), + failures=string_tuple(data.get("gcs_failures"), "gcs_failures"), + ), + release_manifest=ReleaseManifestPromotionResult( + root_path=required_string(data, "release_manifest_path"), + versioned_path=required_string( + data, + "versioned_release_manifest_path", + ), + trace_tro_path=required_string(data, "trace_tro_path"), + versioned_trace_tro_path=required_string( + data, + "versioned_trace_tro_path", + ), + artifact_count=nonnegative_int( + data.get("release_manifest_artifacts"), + "release_manifest_artifacts", + ), + manifest_sha256=optional_string(data, "release_manifest_sha256"), + ), + version_manifest=VersionManifestPromotionResult( + path=required_string(data, "version_manifest_path"), + version=required_string(data, "version_manifest_version"), + updated=bool_value( + data.get("version_manifest_updated", not already_finalized), + "version_manifest_updated", + ), + current_version=( + optional_string(data, "version_manifest_current_version") + or release_version + ), + ), + completion_marker=CompletionMarkerPromotionResult( + marker_path=required_string(data, "release_completion_marker"), + tag=required_string(data, "release_completion_tag"), + valid=bool_value(data.get("release_completion_valid"), "valid"), + ), + cleanup=CleanupPromotionResult( + cleaned_count=nonnegative_int( + data.get("staging_cleaned"), + "staging_cleaned", + ), + attempted=cleanup_attempted, + status=cleanup_status(cleanup_status_value), + ), + already_finalized=already_finalized, + ) diff --git a/policyengine_us_data/release_promotion/results/manifests.py b/policyengine_us_data/release_promotion/results/manifests.py new file mode 100644 index 000000000..7f8f96380 --- /dev/null +++ b/policyengine_us_data/release_promotion/results/manifests.py @@ -0,0 +1,184 @@ +"""Typed manifest result models for release promotion.""" + +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass +from typing import Any + +from policyengine_us_data.release_promotion.results._coercion import ( + bool_value, + nonnegative_int, +) +from policyengine_us_data.stage_contracts._coercion import ( + optional_string, + optional_string_value, + require_non_empty, + required_string, + schema_version, + validate_schema_version, +) +from policyengine_us_data.stage_contracts.constants import CONTRACT_SCHEMA_VERSION + + +@dataclass(frozen=True, kw_only=True) +class ReleaseManifestPromotionResult: + """Result for writing the release manifest and TRACE TRO artifacts.""" + + root_path: str + versioned_path: str + trace_tro_path: str + versioned_trace_tro_path: str + artifact_count: int + manifest_sha256: str | None = None + schema_version: str = CONTRACT_SCHEMA_VERSION + + def __post_init__(self) -> None: + validate_schema_version(self.schema_version, self.__class__.__name__) + object.__setattr__( + self, "root_path", require_non_empty(self.root_path, "root_path") + ) + object.__setattr__( + self, + "versioned_path", + require_non_empty(self.versioned_path, "versioned_path"), + ) + object.__setattr__( + self, + "trace_tro_path", + require_non_empty(self.trace_tro_path, "trace_tro_path"), + ) + object.__setattr__( + self, + "versioned_trace_tro_path", + require_non_empty( + self.versioned_trace_tro_path, + "versioned_trace_tro_path", + ), + ) + object.__setattr__( + self, + "artifact_count", + nonnegative_int(self.artifact_count, "artifact_count"), + ) + object.__setattr__( + self, + "manifest_sha256", + optional_string_value(self.manifest_sha256, "manifest_sha256"), + ) + + def to_dict(self) -> dict[str, Any]: + """Serialize this result to JSON-compatible primitives.""" + + return { + "root_path": self.root_path, + "versioned_path": self.versioned_path, + "trace_tro_path": self.trace_tro_path, + "versioned_trace_tro_path": self.versioned_trace_tro_path, + "artifact_count": self.artifact_count, + "manifest_sha256": self.manifest_sha256, + "schema_version": self.schema_version, + } + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "ReleaseManifestPromotionResult": + """Restore a release manifest result from a mapping.""" + + return cls( + root_path=required_string(data, "root_path"), + versioned_path=required_string(data, "versioned_path"), + trace_tro_path=required_string(data, "trace_tro_path"), + versioned_trace_tro_path=required_string(data, "versioned_trace_tro_path"), + artifact_count=nonnegative_int( + data.get("artifact_count"), + "artifact_count", + ), + manifest_sha256=optional_string(data, "manifest_sha256"), + schema_version=schema_version(data), + ) + + +@dataclass(frozen=True, kw_only=True) +class VersionManifestPromotionResult: + """Result for updating the public version manifest.""" + + path: str + version: str + updated: bool + current_version: str | None = None + schema_version: str = CONTRACT_SCHEMA_VERSION + + def __post_init__(self) -> None: + validate_schema_version(self.schema_version, self.__class__.__name__) + object.__setattr__(self, "path", require_non_empty(self.path, "path")) + object.__setattr__(self, "version", require_non_empty(self.version, "version")) + object.__setattr__(self, "updated", bool_value(self.updated, "updated")) + object.__setattr__( + self, + "current_version", + optional_string_value(self.current_version, "current_version"), + ) + + def to_dict(self) -> dict[str, Any]: + """Serialize this result to JSON-compatible primitives.""" + + return { + "path": self.path, + "version": self.version, + "updated": self.updated, + "current_version": self.current_version, + "schema_version": self.schema_version, + } + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "VersionManifestPromotionResult": + """Restore a version manifest result from a mapping.""" + + return cls( + path=required_string(data, "path"), + version=required_string(data, "version"), + updated=bool_value(data.get("updated"), "updated"), + current_version=optional_string(data, "current_version"), + schema_version=schema_version(data), + ) + + +@dataclass(frozen=True, kw_only=True) +class CompletionMarkerPromotionResult: + """Result for writing or verifying the release completion marker.""" + + marker_path: str + tag: str + valid: bool + schema_version: str = CONTRACT_SCHEMA_VERSION + + def __post_init__(self) -> None: + validate_schema_version(self.schema_version, self.__class__.__name__) + object.__setattr__( + self, + "marker_path", + require_non_empty(self.marker_path, "marker_path"), + ) + object.__setattr__(self, "tag", require_non_empty(self.tag, "tag")) + object.__setattr__(self, "valid", bool_value(self.valid, "valid")) + + def to_dict(self) -> dict[str, Any]: + """Serialize this result to JSON-compatible primitives.""" + + return { + "marker_path": self.marker_path, + "tag": self.tag, + "valid": self.valid, + "schema_version": self.schema_version, + } + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "CompletionMarkerPromotionResult": + """Restore a completion marker result from a mapping.""" + + return cls( + marker_path=required_string(data, "marker_path"), + tag=required_string(data, "tag"), + valid=bool_value(data.get("valid"), "valid"), + schema_version=schema_version(data), + ) diff --git a/policyengine_us_data/utils/release_promotion.py b/policyengine_us_data/utils/release_promotion.py index def0b4cea..5650b29de 100644 --- a/policyengine_us_data/utils/release_promotion.py +++ b/policyengine_us_data/utils/release_promotion.py @@ -9,13 +9,23 @@ import logging from dataclasses import dataclass from pathlib import Path -from typing import Any, Callable, Sequence +from typing import TYPE_CHECKING, Any, Callable, Sequence +from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.utils.release_completion import release_completion_marker_path +if TYPE_CHECKING: + from policyengine_us_data.release_promotion import FullPromotionResult + ManifestFile = tuple[Path, str] ReleaseManifest = dict[str, Any] +RELEASE_MANIFEST_PATH = "release_manifest.json" +TRACE_TRO_PATH = "trace.tro.jsonld" +VERSION_MANIFEST_PATH = "version_manifest.json" +CLEANUP_STATUS_SKIPPED = "skipped" +CLEANUP_STATUS_COMPLETED = "completed" +CLEANUP_STATUS_FAILED = "failed" @dataclass(frozen=True) @@ -120,7 +130,7 @@ def promote_full_release( deps=deps, ) - cleaned = _cleanup_staging_after_release( + cleaned, cleanup_status = _cleanup_staging_after_release( config=config, rel_paths=rel_paths, deps=deps, @@ -128,6 +138,7 @@ def promote_full_release( ) return { + **_promotion_identity_result_fields(config, rel_paths), "run_id": config.run_id, "candidate_version": config.candidate_version, "release_version": config.release_version, @@ -135,11 +146,39 @@ def promote_full_release( "hf_promoted": promoted_hf, "gcs_uploaded": uploaded_gcs, "release_manifest_artifacts": len(release_manifest["artifacts"]), + "version_manifest_updated": True, "release_completion_marker": completion_marker.get("marker_path"), + "release_completion_tag": config.release_version, + "release_completion_valid": True, "staging_cleaned": cleaned, + "staging_cleanup_attempted": config.cleanup_staging, + "staging_cleanup_status": cleanup_status, } +@pipeline_node( + id="typed_full_release_promotion", + label="Typed Full Release Promotion", + node_type="library", + description="Compatibility wrapper that returns typed Stage 5 promotion results from the existing transaction engine.", + status="transitional", + stability="moving", + pathways=["5_validate_and_promote_release"], + artifacts_in=["staged release artifacts", "release manifest inputs"], + artifacts_out=["FullPromotionResult"], + validation_commands=["uv run pytest tests/unit/release_promotion/test_results.py"], +) +def promote_full_release_with_result( + config: FullReleasePromotionConfig, + deps: FullReleasePromotionDependencies, +) -> "FullPromotionResult": + """Run the existing transaction engine and wrap its output in a typed result.""" + + from policyengine_us_data.release_promotion import FullPromotionResult + + return FullPromotionResult.from_legacy_dict(promote_full_release(config, deps)) + + def _validated_release_paths( config: FullReleasePromotionConfig, deps: FullReleasePromotionDependencies, @@ -215,13 +254,18 @@ def _finish_already_finalized_release( config=config, deps=deps, ) - cleaned = _cleanup_staging_after_release( + cleaned, cleanup_status = _cleanup_staging_after_release( config=config, rel_paths=rel_paths, deps=deps, warning="Release %s was already finalized, but staging cleanup failed.", ) return { + **_promotion_identity_result_fields( + config, + rel_paths, + already_finalized=True, + ), "run_id": config.run_id, "candidate_version": config.candidate_version, "release_version": config.release_version, @@ -229,8 +273,13 @@ def _finish_already_finalized_release( "hf_promoted": 0, "gcs_uploaded": 0, "release_manifest_artifacts": len(finalized_manifest["artifacts"]), + "version_manifest_updated": False, "release_completion_marker": completion_marker_path, + "release_completion_tag": config.release_version, + "release_completion_valid": True, "staging_cleaned": cleaned, + "staging_cleanup_attempted": config.cleanup_staging, + "staging_cleanup_status": cleanup_status, "already_finalized": True, } @@ -342,25 +391,64 @@ def _cleanup_staging_after_release( rel_paths: Sequence[str], deps: FullReleasePromotionDependencies, warning: str, -) -> int: +) -> tuple[int, str]: if not config.cleanup_staging: - return 0 + return 0, CLEANUP_STATUS_SKIPPED cleanup_paths = deps.dedupe_preserving_order( [*rel_paths, *config.extra_cleanup_paths] ) try: - return deps.cleanup_staging_hf( + cleaned = deps.cleanup_staging_hf( cleanup_paths, candidate_version=config.candidate_version, hf_repo_name=config.hf_repo_name, hf_repo_type=config.hf_repo_type, run_id=config.run_id, ) + return cleaned, CLEANUP_STATUS_COMPLETED except Exception: logging.warning( warning, config.release_version, exc_info=True, ) - return 0 + return 0, CLEANUP_STATUS_FAILED + + +def _promotion_identity_result_fields( + config: FullReleasePromotionConfig, + rel_paths: Sequence[str], + *, + already_finalized: bool = False, +) -> dict[str, Any]: + paths = tuple(rel_paths) + versioned_release_manifest_path = ( + f"releases/{config.release_version}/{RELEASE_MANIFEST_PATH}" + ) + versioned_trace_tro_path = f"releases/{config.release_version}/{TRACE_TRO_PATH}" + return { + "rel_paths": paths, + "hf_repo_name": config.hf_repo_name, + "hf_repo_type": config.hf_repo_type, + "hf_staging_prefix": _hf_staging_prefix(config), + "hf_promoted_paths": paths, + "hf_commit_id": None, + "hf_noop_paths": paths if already_finalized else (), + "gcs_bucket_name": config.gcs_bucket_name, + "gcs_object_paths": paths, + "gcs_skipped_paths": paths if already_finalized else (), + "gcs_failures": (), + "release_manifest_path": RELEASE_MANIFEST_PATH, + "versioned_release_manifest_path": versioned_release_manifest_path, + "trace_tro_path": TRACE_TRO_PATH, + "versioned_trace_tro_path": versioned_trace_tro_path, + "release_manifest_sha256": None, + "version_manifest_path": VERSION_MANIFEST_PATH, + "version_manifest_version": config.release_version, + "version_manifest_current_version": config.release_version, + } + + +def _hf_staging_prefix(config: FullReleasePromotionConfig) -> str: + return f"staging/{config.candidate_version}-{config.run_id}" diff --git a/tests/unit/release_promotion/test_results.py b/tests/unit/release_promotion/test_results.py new file mode 100644 index 000000000..52f62d4e2 --- /dev/null +++ b/tests/unit/release_promotion/test_results.py @@ -0,0 +1,377 @@ +from pathlib import Path + +import pytest + +from policyengine_us_data.release_promotion import FullPromotionResult +from policyengine_us_data.utils.release_promotion import ( + FullReleasePromotionConfig, + FullReleasePromotionDependencies, + promote_full_release_with_result, +) + +_RELEASE_MANIFEST_PATH = "release_manifest.json" +_TRACE_TRO_PATH = "trace.tro.jsonld" +_VERSION_MANIFEST_PATH = "version_manifest.json" + + +def _make_files(tmp_path, rel_paths): + files = [] + for rel_path in rel_paths: + path = tmp_path / rel_path + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(rel_path, encoding="utf-8") + files.append((path, rel_path)) + return tuple(files) + + +class FakeFullReleasePromotionDependencies: + def __init__( + self, + *, + finalized_manifest=None, + marker_exists=True, + missing_staged_artifacts=(), + preflight_result=(True, []), + cleanup_error: Exception | None = None, + ) -> None: + self.finalized_manifest = finalized_manifest + self.marker_exists = marker_exists + self.missing_staged_artifacts = tuple(missing_staged_artifacts) + self.preflight_result = preflight_result + self.cleanup_error = cleanup_error + self.calls = [] + + def as_dependencies(self) -> FullReleasePromotionDependencies: + return FullReleasePromotionDependencies( + dedupe_preserving_order=self.dedupe_preserving_order, + download_staged_artifacts_for_manifest=( + self.download_staged_artifacts_for_manifest + ), + get_matching_finalized_release_manifest=( + self.get_matching_finalized_release_manifest + ), + list_missing_staged_artifacts=self.list_missing_staged_artifacts, + preflight_release_manifest_publish=self.preflight_release_manifest_publish, + promote_staging_to_production_hf=self.promote_staging_to_production_hf, + upload_from_hf_staging_to_gcs=self.upload_from_hf_staging_to_gcs, + publish_release_manifest_to_hf=self.publish_release_manifest_to_hf, + upload_final_version_manifest=self.upload_final_version_manifest, + upload_release_completion_marker=self.upload_release_completion_marker, + release_completion_marker_exists=self.release_completion_marker_exists, + cleanup_staging_hf=self.cleanup_staging_hf, + ) + + def dedupe_preserving_order(self, paths): + seen = set() + deduped = [] + for path in paths: + if path not in seen: + seen.add(path) + deduped.append(path) + return deduped + + def download_staged_artifacts_for_manifest(self, *args, **kwargs): + self.calls.append("download") + return [] + + def get_matching_finalized_release_manifest(self, *args, **kwargs): + self.calls.append("check_finalized") + return self.finalized_manifest + + def list_missing_staged_artifacts(self, *args, **kwargs): + self.calls.append("validate_staging") + return list(self.missing_staged_artifacts) + + def preflight_release_manifest_publish(self, *args, **kwargs): + self.calls.append("preflight_manifest") + return self.preflight_result + + def promote_staging_to_production_hf(self, paths, *args, **kwargs): + self.calls.append("promote_hf") + return len(paths) + + def upload_from_hf_staging_to_gcs(self, paths, *args, **kwargs): + self.calls.append("upload_gcs") + return len(paths) + + def publish_release_manifest_to_hf(self, files_with_paths, *args, **kwargs): + self.calls.append("release_manifest") + return { + "artifacts": { + Path(repo_path).with_suffix("").as_posix(): { + "path": repo_path, + "sha256": f"sha256:{repo_path}", + } + for _, repo_path in files_with_paths + } + } + + def upload_final_version_manifest(self, *args, **kwargs): + self.calls.append("version_manifest") + + def upload_release_completion_marker(self, *args, **kwargs): + self.calls.append("release_complete") + return {"marker_path": "releases/1.73.0/release-complete.json"} + + def release_completion_marker_exists(self, *args, **kwargs): + self.calls.append("check_marker") + return self.marker_exists + + def cleanup_staging_hf(self, paths, *args, **kwargs): + self.calls.append("cleanup_staging") + if self.cleanup_error is not None: + raise self.cleanup_error + return len(paths) + + +def _config( + rel_paths, + files_with_paths, + *, + cleanup_staging=True, +) -> FullReleasePromotionConfig: + return FullReleasePromotionConfig( + rel_paths=rel_paths, + candidate_version="1.73.0rc1", + release_version="1.73.0", + run_id="run-123", + files_with_paths=files_with_paths, + extra_cleanup_paths=("_run_context.json",), + cleanup_staging=cleanup_staging, + ) + + +def _legacy_result_payload(**overrides): + payload = { + "run_id": "run-123", + "candidate_version": "1.73.0rc1", + "release_version": "1.73.0", + "rel_paths": ("cps_2024.h5", "states/AL.h5"), + "hf_repo_name": "policyengine/policyengine-us-data", + "hf_repo_type": "model", + "hf_staging_prefix": "staging/1.73.0rc1-run-123", + "hf_promoted": 2, + "hf_promoted_paths": ("cps_2024.h5", "states/AL.h5"), + "hf_commit_id": None, + "hf_noop_paths": (), + "gcs_bucket_name": "policyengine-us-data", + "gcs_uploaded": 2, + "gcs_object_paths": ("cps_2024.h5", "states/AL.h5"), + "gcs_skipped_paths": (), + "gcs_failures": (), + "release_manifest_path": _RELEASE_MANIFEST_PATH, + "versioned_release_manifest_path": ("releases/1.73.0/release_manifest.json"), + "trace_tro_path": _TRACE_TRO_PATH, + "versioned_trace_tro_path": "releases/1.73.0/trace.tro.jsonld", + "release_manifest_sha256": None, + "release_manifest_artifacts": 2, + "version_manifest_path": _VERSION_MANIFEST_PATH, + "version_manifest_version": "1.73.0", + "version_manifest_current_version": "1.73.0", + "version_manifest_updated": True, + "release_completion_marker": "releases/1.73.0/release-complete.json", + "release_completion_tag": "1.73.0", + "release_completion_valid": True, + "staging_cleaned": 3, + "staging_cleanup_attempted": True, + "staging_cleanup_status": "completed", + } + payload.update(overrides) + return payload + + +def test_full_promotion_result_wraps_legacy_dict() -> None: + result = FullPromotionResult.from_legacy_dict( + _legacy_result_payload(artifact_count=2) + ) + + assert result.hf.repo_name == "policyengine/policyengine-us-data" + assert result.hf.repo_type == "model" + assert result.hf.source_staging_prefix == "staging/1.73.0rc1-run-123" + assert result.hf.promoted_paths == ("cps_2024.h5", "states/AL.h5") + assert result.hf.promoted_count == 2 + assert result.hf.commit_id is None + assert result.hf.noop_paths == () + assert result.gcs.bucket_name == "policyengine-us-data" + assert result.gcs.object_paths == ("cps_2024.h5", "states/AL.h5") + assert result.gcs.release_version == "1.73.0" + assert result.gcs.uploaded_count == 2 + assert result.gcs.skipped_paths == () + assert result.release_manifest.root_path == _RELEASE_MANIFEST_PATH + assert result.release_manifest.versioned_path == ( + "releases/1.73.0/release_manifest.json" + ) + assert result.release_manifest.trace_tro_path == _TRACE_TRO_PATH + assert result.release_manifest.versioned_trace_tro_path == ( + "releases/1.73.0/trace.tro.jsonld" + ) + assert result.release_manifest.artifact_count == 2 + assert result.version_manifest.path == _VERSION_MANIFEST_PATH + assert result.version_manifest.version == "1.73.0" + assert result.version_manifest.updated is True + assert result.version_manifest.current_version == "1.73.0" + assert result.completion_marker.marker_path == ( + "releases/1.73.0/release-complete.json" + ) + assert result.completion_marker.tag == "1.73.0" + assert result.completion_marker.valid is True + assert result.cleanup.cleaned_count == 3 + assert result.cleanup.status == "completed" + assert FullPromotionResult.from_dict(result.to_dict()) == result + + +def test_promote_full_release_with_result_preserves_transaction_order(tmp_path) -> None: + rel_paths = ("cps_2024.h5", "states/AL.h5", "national/US.h5") + files = _make_files(tmp_path, rel_paths) + fake_deps = FakeFullReleasePromotionDependencies() + + result = promote_full_release_with_result( + _config(rel_paths, files), + fake_deps.as_dependencies(), + ) + + assert fake_deps.calls == [ + "check_finalized", + "validate_staging", + "preflight_manifest", + "promote_hf", + "upload_gcs", + "release_manifest", + "version_manifest", + "release_complete", + "cleanup_staging", + ] + assert isinstance(result, FullPromotionResult) + assert result.run_id == "run-123" + assert result.artifact_count == 3 + assert result.hf.repo_name == "policyengine/policyengine-us-data" + assert result.hf.repo_type == "model" + assert result.hf.source_staging_prefix == "staging/1.73.0rc1-run-123" + assert result.hf.promoted_paths == rel_paths + assert result.hf.promoted_count == 3 + assert result.hf.noop_paths == () + assert result.gcs.bucket_name == "policyengine-us-data" + assert result.gcs.object_paths == rel_paths + assert result.gcs.release_version == "1.73.0" + assert result.gcs.uploaded_count == 3 + assert result.release_manifest.root_path == _RELEASE_MANIFEST_PATH + assert result.release_manifest.versioned_path == ( + "releases/1.73.0/release_manifest.json" + ) + assert result.release_manifest.trace_tro_path == _TRACE_TRO_PATH + assert result.release_manifest.versioned_trace_tro_path == ( + "releases/1.73.0/trace.tro.jsonld" + ) + assert result.release_manifest.artifact_count == 3 + assert result.version_manifest.path == _VERSION_MANIFEST_PATH + assert result.version_manifest.version == "1.73.0" + assert result.version_manifest.updated is True + assert result.version_manifest.current_version == "1.73.0" + assert result.completion_marker.tag == "1.73.0" + assert result.completion_marker.valid is True + assert result.cleanup.cleaned_count == 4 + assert result.cleanup.status == "completed" + assert result.already_finalized is False + + +def test_promote_full_release_with_result_handles_already_finalized(tmp_path) -> None: + rel_paths = ("states/AL.h5",) + files = _make_files(tmp_path, rel_paths) + fake_deps = FakeFullReleasePromotionDependencies( + finalized_manifest={"artifacts": {"states/AL": {"path": "states/AL.h5"}}}, + marker_exists=True, + ) + + result = promote_full_release_with_result( + _config(rel_paths, files), + fake_deps.as_dependencies(), + ) + + assert fake_deps.calls == ["check_finalized", "check_marker", "cleanup_staging"] + assert result.already_finalized is True + assert result.hf.already_finalized is True + assert result.hf.promoted_paths == rel_paths + assert result.hf.noop_paths == rel_paths + assert result.hf.promoted_count == 0 + assert result.gcs.object_paths == rel_paths + assert result.gcs.skipped_paths == rel_paths + assert result.gcs.uploaded_count == 0 + assert result.release_manifest.artifact_count == 1 + assert result.version_manifest.updated is False + assert result.version_manifest.current_version == "1.73.0" + assert result.completion_marker.marker_path == ( + "releases/1.73.0/release-complete.json" + ) + assert result.completion_marker.valid is True + + +def test_promote_full_release_with_result_represents_cleanup_failure(tmp_path) -> None: + rel_paths = ("states/AL.h5",) + files = _make_files(tmp_path, rel_paths) + fake_deps = FakeFullReleasePromotionDependencies( + cleanup_error=RuntimeError("cleanup unavailable"), + ) + + result = promote_full_release_with_result( + _config(rel_paths, files), + fake_deps.as_dependencies(), + ) + + assert "cleanup_staging" in fake_deps.calls + assert result.cleanup.attempted is True + assert result.cleanup.status == "failed" + assert result.cleanup.cleaned_count == 0 + assert result.hf.promoted_count == 1 + assert result.gcs.uploaded_count == 1 + + +def test_promote_full_release_with_result_represents_skipped_cleanup(tmp_path) -> None: + rel_paths = ("states/AL.h5",) + files = _make_files(tmp_path, rel_paths) + fake_deps = FakeFullReleasePromotionDependencies() + + result = promote_full_release_with_result( + _config(rel_paths, files, cleanup_staging=False), + fake_deps.as_dependencies(), + ) + + assert "cleanup_staging" not in fake_deps.calls + assert result.cleanup.attempted is False + assert result.cleanup.status == "skipped" + assert result.cleanup.cleaned_count == 0 + + +def test_cleanup_result_rejects_ambiguous_status() -> None: + payload = _legacy_result_payload( + artifact_count=2, + staging_cleanup_status="unknown", + ) + + with pytest.raises(ValueError, match="cleanup status"): + FullPromotionResult.from_legacy_dict(payload) + + +def test_full_promotion_result_requires_completion_marker_path() -> None: + payload = _legacy_result_payload( + artifact_count=2, + release_completion_marker=None, + ) + + with pytest.raises(ValueError, match="release_completion_marker"): + FullPromotionResult.from_legacy_dict(payload) + + +def test_promote_full_release_with_result_fails_before_public_writes(tmp_path) -> None: + rel_paths = ("states/AL.h5",) + files = _make_files(tmp_path, rel_paths) + fake_deps = FakeFullReleasePromotionDependencies( + missing_staged_artifacts=("staging/1.73.0rc1-run-123/states/AL.h5",), + ) + + with pytest.raises(FileNotFoundError, match="Missing staged release artifacts"): + promote_full_release_with_result( + _config(rel_paths, files), + fake_deps.as_dependencies(), + ) + + assert fake_deps.calls == ["check_finalized", "validate_staging"]