Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1037.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the Stage 5 release promotion contract and runtime manifest output.
8 changes: 8 additions & 0 deletions docs/engineering/pipeline-map.md
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,14 @@ class ReleasePromotionContext

Canonical run, candidate, release, and destination identity for Stage 5.

### `policyengine_us_data.release_promotion.contract.ReleasePromotionContractBuilder`

```python
class ReleasePromotionContractBuilder
```

Build a Stage 5 contract from candidate identity and promotion results.

### `modal_app.local_area._resolve_scope_fingerprint`

```python
Expand Down
26 changes: 26 additions & 0 deletions docs/engineering/stages/release_promotion.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,29 @@ cleanup `status` as `skipped`, `completed`, or `failed` on
`CleanupPromotionResult`. Later contract, index, diagnostics, and status
writers should read this typed material instead of scraping logs or
reconstructing public paths independently.

## Release Promotion Contract

Stage 5 writes `release_promotion_contract.json` under the run-local
`diagnostics/contracts/` directory after the promotion transaction succeeds and
before the Stage 5 step manifest is completed. The contract is the semantic
record for the Stage 5 boundary: it ties the canonical `run_id`, candidate
identity, Stage 4 output contract reference when available, validation report
paths, public Hugging Face and GCS refs, cleanup status, and typed
`FullPromotionResult` into one durable `StageContract`.

The contract complements the public release files instead of replacing them:

- `release_manifest.json` and `releases/{version}/release_manifest.json` remain
the public artifact inventory for the stable release.
- `version_manifest.json` remains the public version registry used by clients
and publication checks.
- `releases/{version}/release-complete.json` remains the final completion
marker and tag target proving the release was fully finalized.
- `release_promotion_contract.json` remains run-scoped diagnostics material for
dashboards, AI agents, rerun comparison, and promotion auditability.

Runtime step manifests for `5_validate_and_promote_release` should include the
contract as a JSON `contract` output. They may still record legacy validated
input artifacts for compatibility, but the contract is the preferred semantic
entry point for Stage 5 status and lineage.
42 changes: 38 additions & 4 deletions docs/generated/pipeline_api.json
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,7 @@
"docstring": "Typed result for a full Stage 5 release promotion transaction.",
"id": "full_promotion_result",
"kind": "class",
"line": 62,
"line": 63,
"metadata": {
"api_refs": [
"policyengine_us_data.release_promotion.results.FullPromotionResult",
Expand Down Expand Up @@ -3086,7 +3086,7 @@
"docstring": "Promote a completed pipeline run to production.\n\n1. Verify run status is \"completed\"\n2. Promote every staged artifact in one Hugging Face commit\n3. Upload/copy every artifact to GCS\n4. Finalize release_manifest.json, tag the release, and update\n version_manifest.json\n5. Update run status to \"promoted\"\n\nArgs:\n run_id: The run ID to promote.\n candidate_version: Candidate staging scope used for staged source files.\n release_version: Stable version used for final release metadata.\n\nReturns:\n Summary message.",
"id": "promote_pipeline_run",
"kind": "function",
"line": 1910,
"line": 2079,
"metadata": {
"api_refs": [
"modal_app.pipeline.promote_run"
Expand Down Expand Up @@ -3347,6 +3347,40 @@
"signature": "class ReleasePromotionContext",
"source_file": "policyengine_us_data/release_promotion/context.py"
},
"release_promotion_contract_builder": {
"docstring": "Build a Stage 5 contract from candidate identity and promotion results.",
"id": "release_promotion_contract_builder",
"kind": "class",
"line": 71,
"metadata": {
"api_refs": [
"policyengine_us_data.release_promotion.contract.ReleasePromotionContractBuilder"
],
"artifacts_in": [
"release candidate bundle",
"typed promotion result"
],
"artifacts_out": [
"release_promotion_contract.json"
],
"description": "Build the canonical Stage 5 release promotion contract.",
"id": "release_promotion_contract_builder",
"label": "ReleasePromotionContractBuilder",
"node_type": "library",
"pathways": [
"5_validate_and_promote_release"
],
"source_file": "policyengine_us_data/release_promotion/contract.py",
"stability": "moving",
"status": "transitional",
"validation_commands": [
"uv run pytest tests/unit/release_promotion/test_contract.py"
]
},
"object_path": "policyengine_us_data.release_promotion.contract.ReleasePromotionContractBuilder",
"signature": "class ReleasePromotionContractBuilder",
"source_file": "policyengine_us_data/release_promotion/contract.py"
},
"resolve_scope_fingerprint": {
"docstring": "Compute the scope fingerprint while preserving pinned resume values.",
"id": "resolve_scope_fingerprint",
Expand Down Expand Up @@ -3507,7 +3541,7 @@
"docstring": "Run the full pipeline end-to-end.\n\nArgs:\n branch: Git branch to build from.\n gpu: GPU type for regional calibration.\n epochs: Training epochs for regional calibration.\n national_gpu: GPU type for national calibration.\n national_epochs: Training epochs for national.\n num_workers: Number of parallel H5 workers.\n n_clones: Number of clones for H5 building.\n skip_national: Skip national calibration/H5.\n resume_run_id: Resume a previously failed run.\n clear_checkpoints: Wipe ALL checkpoints before building\n (default False). Normally not needed \u2014 checkpoints are\n scoped by commit SHA, so stale ones from other commits\n are cleaned automatically. Use True only to force a\n full rebuild of the current commit.\n candidate_version: Candidate staging scope used for HF staging.\n release_version: Final stable release version. Usually empty until\n promotion.\n base_release_version: Stable release current when this candidate was\n built.\n release_bump: Intended SemVer bump for this candidate.\n sha_override: Exact source SHA deployed by GitHub Actions. When\n provided, this is recorded instead of reading the current\n branch tip.\n run_id: Cross-system run ID created by GitHub.\n run_context: Serialized run context from the launcher workflow.\n modal_app_name: Deployed Modal app name for this run.\n modal_environment: Modal environment used for this run.\n chunked_matrix: Build the calibration matrix in clone-household\n chunks instead of the non-chunked path. Opt-in; default off.\n chunk_size: Clone-household columns per chunk when\n ``chunked_matrix`` is True.\n parallel_matrix: Fan chunked matrix building across Modal\n workers via ``build_matrix_chunk_worker``. Only meaningful\n when ``chunked_matrix`` is True; ignored otherwise.\n num_matrix_workers: Number of Modal workers when\n ``parallel_matrix`` is True.\n\nReturns:\n The run ID for use with promote.",
"id": "run_modal_pipeline",
"kind": "function",
"line": 943,
"line": 1112,
"metadata": {
"api_refs": [
"modal_app.pipeline.run_pipeline"
Expand Down Expand Up @@ -4387,7 +4421,7 @@
"docstring": "Verify deployed-image imports and subprocess seams.",
"id": "verify_runtime_seams",
"kind": "function",
"line": 569,
"line": 738,
"metadata": {
"api_refs": [
"modal_app.pipeline.verify_runtime_seams"
Expand Down
29 changes: 27 additions & 2 deletions docs/generated/pipeline_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,31 @@
"uv run pytest tests/unit/release_promotion/test_candidate.py"
]
},
{
"api_refs": [
"policyengine_us_data.release_promotion.contract.ReleasePromotionContractBuilder"
],
"artifacts_in": [
"release candidate bundle",
"typed promotion result"
],
"artifacts_out": [
"release_promotion_contract.json"
],
"description": "Build the canonical Stage 5 release promotion contract.",
"id": "release_promotion_contract_builder",
"label": "ReleasePromotionContractBuilder",
"node_type": "library",
"pathways": [
"5_validate_and_promote_release"
],
"source_file": "policyengine_us_data/release_promotion/contract.py",
"stability": "moving",
"status": "transitional",
"validation_commands": [
"uv run pytest tests/unit/release_promotion/test_contract.py"
]
},
{
"api_refs": [
"policyengine_us_data.build_outputs.fingerprinting.FingerprintingService",
Expand Down Expand Up @@ -1971,9 +1996,9 @@
}
],
"metadata": {
"api_node_count": 95,
"api_node_count": 96,
"canonical_stage_count": 5,
"decorated_object_count": 153,
"decorated_object_count": 154,
"mapped_decorated_node_count": 58,
"stage_count": 17,
"substage_count": 17
Expand Down
183 changes: 181 additions & 2 deletions modal_app/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,175 @@ def _promote_full_release_from_staging(
)


def _promotion_result_from_stdout(promotion_stdout: str):
"""Parse typed promotion results from the promotion subprocess output."""

from policyengine_us_data.release_promotion import parse_full_promotion_result_json

try:
return parse_full_promotion_result_json(promotion_stdout)
except ValueError as exc:
raise RuntimeError(
"Full release promotion subprocess did not return a valid "
"typed promotion result."
) from exc


def _release_promotion_context_from_run_context(run_context: RunContext):
"""Build the Stage 5 library context from the orchestration run context."""

from policyengine_us_data.release_promotion import ReleasePromotionContext

return ReleasePromotionContext(
run_id=run_context.run_id,
candidate_version=run_context.candidate_version,
release_version=run_context.release_version,
hf_repo_name="policyengine/policyengine-us-data",
gcs_bucket_name="policyengine-us-data",
base_release_version=run_context.base_release_version or None,
release_bump=run_context.release_bump or None,
modal_app_name=run_context.modal_app_name or None,
modal_environment=run_context.modal_environment or None,
hf_staging_prefix=run_context.hf_staging_prefix or None,
metadata={"run_context": run_context.to_dict()},
)


def _release_artifact_metadata_by_path(
run_id: str,
rel_paths: list[str],
) -> dict[str, dict[str, object]]:
"""Return local checksum/size metadata for staged release artifacts."""

metadata: dict[str, dict[str, object]] = {}
for local_path, rel_path in _full_release_manifest_files(run_id, rel_paths):
path = Path(local_path)
if not path.exists() or not path.is_file():
continue
reference = ArtifactReference.from_path(path)
metadata[rel_path] = {
"sha256": f"sha256:{reference.sha256}",
"size_bytes": reference.size_bytes,
}
return metadata


def _stage4_output_contract_repo_path_if_available(run_id: str) -> str | None:
"""Return the run-repo path for the Stage 4 contract when it exists locally."""

run_dir = _run_dir(run_id)
candidates = (
run_dir / "diagnostics" / "contracts" / "output_build_contract.json",
run_dir / "contracts" / "output_build_contract.json",
run_dir / "output_build_contract.json",
)
for path in candidates:
if path.exists() and path.is_file():
return f"calibration/runs/{run_id}/{path.relative_to(run_dir).as_posix()}"
return None


RUN_DIAGNOSTICS_VALIDATION_REPORT_FILENAMES = (
"validation_report.json",
"validation_summary.json",
"validation_results.csv",
"national_validation.txt",
)
RUN_DIAGNOSTICS_MANIFEST_FILENAMES = (
"manifest.json",
"diagnostics_manifest.json",
)


def _run_diagnostics_repo_path_if_available(run_id: str, filename: str) -> str | None:
"""Return the repo path for a run-local diagnostics file when present."""

run_dir = _run_dir(run_id)
path = run_dir / "diagnostics" / filename
if not path.exists() or not path.is_file():
return None
return f"calibration/runs/{run_id}/{path.relative_to(run_dir).as_posix()}"


def _run_validation_report_repo_paths_if_available(run_id: str) -> list[str]:
"""Return uploaded-run paths for validation diagnostics available locally."""

return [
repo_path
for filename in RUN_DIAGNOSTICS_VALIDATION_REPORT_FILENAMES
if (
repo_path := _run_diagnostics_repo_path_if_available(
run_id,
filename,
)
)
]


def _run_diagnostics_manifest_repo_path_if_available(run_id: str) -> str | None:
"""Return the run diagnostics manifest path when one exists locally."""

for filename in RUN_DIAGNOSTICS_MANIFEST_FILENAMES:
repo_path = _run_diagnostics_repo_path_if_available(run_id, filename)
if repo_path is not None:
return repo_path
return None


def _write_release_promotion_contract_for_run(
*,
meta: RunMetadata,
run_context: RunContext,
rel_paths: list[str],
promotion_result,
) -> ArtifactReference:
"""Write Stage 5's run-local contract and return its manifest reference."""

from policyengine_us_data.release_promotion import (
build_legacy_release_candidate_bundle,
release_promotion_contract_path,
write_release_promotion_contract,
)

run_dir = _run_dir(run_context.run_id)
contract_path = release_promotion_contract_path(run_dir)
candidate_bundle = build_legacy_release_candidate_bundle(
context=_release_promotion_context_from_run_context(run_context),
rel_paths=rel_paths,
artifact_metadata_by_path=_release_artifact_metadata_by_path(
run_context.run_id,
rel_paths,
),
source_output_contract_path=_stage4_output_contract_repo_path_if_available(
run_context.run_id
),
validation_report_paths=_run_validation_report_repo_paths_if_available(
run_context.run_id
),
diagnostics_manifest_path=_run_diagnostics_manifest_repo_path_if_available(
run_context.run_id
),
)
write_release_promotion_contract(
contract_path=contract_path,
candidate_bundle=candidate_bundle,
promotion_result=promotion_result,
created_at=datetime.now(timezone.utc).isoformat(),
code_sha=meta.sha,
package_version=meta.version,
metadata={
"writer": "modal_app.pipeline.promote_run",
"branch": meta.branch,
},
)
return ArtifactReference.from_path(
contract_path,
role="contract",
base_dir=run_dir,
media_type="application/json",
)


@app.function(
image=image,
timeout=300,
Expand Down Expand Up @@ -2039,6 +2208,13 @@ def promote_run(
promotion_context.to_dict(),
)
print(f" {promotion_stdout}")
promotion_result = _promotion_result_from_stdout(promotion_stdout)
release_promotion_contract_ref = _write_release_promotion_contract_for_run(
meta=meta,
run_context=promotion_context,
rel_paths=rel_paths,
promotion_result=promotion_result,
)

# Update run status only after all required promotion work succeeds.
meta.status = "promoted"
Expand All @@ -2047,8 +2223,11 @@ def promote_run(
_complete_step_manifest(
promote_manifest,
outputs=[
ArtifactReference.from_dict(artifact)
for artifact in promote_inputs["validated_step_outputs"]
*[
ArtifactReference.from_dict(artifact)
for artifact in promote_inputs["validated_step_outputs"]
],
release_promotion_contract_ref,
],
reuse_decision="computed",
vol=pipeline_volume,
Expand Down
Loading
Loading