From f21c6a0c661a1a66f0ab5c09f0bcc8d82c17a50e Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 1 Feb 2026 20:26:48 +0000 Subject: [PATCH 1/5] Add reconcile_targets.py to align DB targets with policyengine-us params The policy_data.db targets table was populated with historical data (IRS SOI 2022, USDA SNAP FY2023) but never updated to match the 2024 simulation year. This caused state calibration aggregates to diverge from the CBO/Treasury projections used by loss.py. New reconciliation script (db/reconcile_targets.py): - Reads authoritative 2024 targets from policyengine-us parameters using the same parameter paths as loss.py build_loss_matrix() - Computes scale factors by comparing state-level DB aggregates to CBO/Treasury targets for income_tax, snap, eitc, and unemployment_compensation - Proportionally scales all geographic levels (national, state, district) and updates the period column to 2024 Also includes: - 4 new tests in test_reconcile_targets.py - Makefile updated to run reconciliation after ETLs, before validation - Black formatting fixes across the codebase Closes #503 https://claude.ai/code/session_01GisHzYtJZQQyUfVdRmWV2t --- Makefile | 1 + policyengine_us_data/datasets/cps/cps.py | 1 - .../datasets/cps/enhanced_cps.py | 1 - policyengine_us_data/datasets/puf/puf.py | 1 - .../datasets/puf/uprate_puf.py | 1 - .../db/create_database_tables.py | 1 - policyengine_us_data/db/etl_age.py | 1 - policyengine_us_data/db/etl_irs_soi.py | 1 - policyengine_us_data/db/reconcile_targets.py | 207 ++++++++++++++++++ policyengine_us_data/db/validate_database.py | 1 - .../calibration_targets/pull_snap_targets.py | 1 - .../tests/test_datasets/test_county_fips.py | 1 - .../tests/test_reconcile_targets.py | 168 ++++++++++++++ policyengine_us_data/utils/census.py | 1 - policyengine_us_data/utils/huggingface.py | 1 - policyengine_us_data/utils/loss.py | 1 - 16 files changed, 376 insertions(+), 13 deletions(-) create mode 100644 policyengine_us_data/db/reconcile_targets.py create mode 100644 policyengine_us_data/tests/test_reconcile_targets.py diff --git a/Makefile b/Makefile index b03e23d55..8f5ad180e 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,7 @@ database: python policyengine_us_data/db/etl_medicaid.py python policyengine_us_data/db/etl_snap.py python policyengine_us_data/db/etl_irs_soi.py + python policyengine_us_data/db/reconcile_targets.py python policyengine_us_data/db/validate_database.py data: diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index f932e0d5e..ae9e37904 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -15,7 +15,6 @@ from microimpute.models.qrf import QRF import logging - test_lite = os.environ.get("TEST_LITE") == "true" print(f"TEST_LITE == {test_lite}") diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 8bbe67bcc..4eb0a660b 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -22,7 +22,6 @@ from pathlib import Path import logging - try: import torch except ImportError: diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index cac9ad61a..7650d9e7f 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -15,7 +15,6 @@ create_policyengine_uprating_factors_table, ) - rng = np.random.default_rng(seed=64) # Get Qualified Business Income simulation parameters --- diff --git a/policyengine_us_data/datasets/puf/uprate_puf.py b/policyengine_us_data/datasets/puf/uprate_puf.py index 1cf0eb9c6..961446156 100644 --- a/policyengine_us_data/datasets/puf/uprate_puf.py +++ b/policyengine_us_data/datasets/puf/uprate_puf.py @@ -2,7 +2,6 @@ import numpy as np from policyengine_us_data.storage import STORAGE_FOLDER - ITMDED_GROW_RATE = 0.02 # annual growth rate in itemized deduction amounts USE_VARIABLE_SPECIFIC_POPULATION_GROWTH_DIVISORS = False diff --git a/policyengine_us_data/db/create_database_tables.py b/policyengine_us_data/db/create_database_tables.py index df03772d0..920d1449e 100644 --- a/policyengine_us_data/db/create_database_tables.py +++ b/policyengine_us_data/db/create_database_tables.py @@ -15,7 +15,6 @@ from policyengine_us_data.storage import STORAGE_FOLDER - logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", diff --git a/policyengine_us_data/db/etl_age.py b/policyengine_us_data/db/etl_age.py index bb83067c4..d80faf065 100644 --- a/policyengine_us_data/db/etl_age.py +++ b/policyengine_us_data/db/etl_age.py @@ -11,7 +11,6 @@ ) from policyengine_us_data.utils.census import get_census_docs, pull_acs_table - LABEL_TO_SHORT = { "Estimate!!Total!!Total population!!AGE!!Under 5 years": "0-4", "Estimate!!Total!!Total population!!AGE!!5 to 9 years": "5-9", diff --git a/policyengine_us_data/db/etl_irs_soi.py b/policyengine_us_data/db/etl_irs_soi.py index 786abb1cc..6607a5dd6 100644 --- a/policyengine_us_data/db/etl_irs_soi.py +++ b/policyengine_us_data/db/etl_irs_soi.py @@ -24,7 +24,6 @@ get_district_mapping, ) - """See the 22incddocguide.docx manual from the IRS SOI""" # Let's make this work with strict inequalities # Language in the doc: '$10,000 under $25,000' diff --git a/policyengine_us_data/db/reconcile_targets.py b/policyengine_us_data/db/reconcile_targets.py new file mode 100644 index 000000000..6c94b6356 --- /dev/null +++ b/policyengine_us_data/db/reconcile_targets.py @@ -0,0 +1,207 @@ +"""Reconcile stale database targets with policyengine-us parameters. + +After the ETL scripts populate policy_data.db with historical +administrative data (IRS SOI 2022, USDA SNAP FY2023), the aggregate +totals drift from the projections that loss.py uses for national +calibration. + +This script scales affected targets so that their national aggregates +match the policyengine-us parameter values for the simulation year, +applying the same proportional adjustment at every geographic level +(national, state, congressional district). + +See: https://github.com/PolicyEngine/policyengine-us-data/issues/503 +""" + +import logging +from typing import Dict, Optional, Tuple + +from sqlalchemy import text +from sqlmodel import Session, create_engine, select + +from policyengine_us.system import system +from policyengine_us_data.storage import STORAGE_FOLDER +from policyengine_us_data.db.create_database_tables import ( + Target, +) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +TARGET_YEAR = 2024 + + +def _get_authoritative_targets(year: int) -> Dict[str, float]: + """Read national targets from policyengine-us parameters. + + Uses the same parameter paths as loss.py ``build_loss_matrix`` + to ensure the database stays consistent with calibration. + + Args: + year: The simulation year. + + Returns: + Mapping of database variable name to authoritative value. + """ + p = system.parameters(year) + + cbo = p.calibration.gov.cbo + targets: Dict[str, float] = { + # CBO budget projections (loss.py lines 201-220) + "income_tax": cbo._children["income_tax"], + "snap": cbo._children["snap"], + "unemployment_compensation": cbo._children[ + "unemployment_compensation" + ], + # Treasury EITC spending (loss.py lines 262-268) + "eitc": ( + system.parameters.calibration.gov.treasury.tax_expenditures.eitc( + year + ) + ), + } + return targets + + +def _compute_state_aggregate( + session: Session, variable: str +) -> Tuple[float, int]: + """Sum state-level target values for a variable. + + State strata are identified by a ``ucgid_str`` constraint whose + value starts with ``0400000US`` (two-digit state FIPS). + + A raw SQL join is used instead of the ORM because the + ``constraint_variable`` column stores values (like ``ucgid_str``) + that fall outside the ``USVariable`` enum, causing SQLAlchemy + deserialization errors if read through the model. + + Args: + session: Active database session. + variable: Target variable name. + + Returns: + Tuple of (sum of state-level targets, count of rows). + """ + result = session.execute( + text(""" + SELECT COALESCE(SUM(t.value), 0) AS total, + COUNT(*) AS cnt + FROM targets t + JOIN stratum_constraints sc + ON sc.stratum_id = t.stratum_id + WHERE t.variable = :variable + AND t.active = 1 + AND sc.constraint_variable = 'ucgid_str' + AND sc.value LIKE '0400000US%' + """), + {"variable": variable}, + ) + row = result.one() + return float(row.total), int(row.cnt) + + +def _scale_targets( + session: Session, + variable: str, + scale_factor: float, + target_year: int, +) -> int: + """Multiply every target for *variable* by *scale_factor*. + + Also updates the ``period`` column to *target_year* so the + database reflects the simulation year rather than the original + source year. + + Args: + session: Active database session. + variable: Target variable name. + scale_factor: Multiplicative adjustment factor. + target_year: New period value for updated rows. + + Returns: + Number of target rows updated. + """ + stmt = select(Target).where(Target.variable == variable) + all_targets = session.exec(stmt).all() + + updated = 0 + for t in all_targets: + if t.value is not None: + t.value *= scale_factor + t.period = target_year + session.add(t) + updated += 1 + + return updated + + +def reconcile_targets( + session: Session, + target_year: int = TARGET_YEAR, +) -> Dict[str, float]: + """Scale database targets to match policyengine-us parameters. + + For each reconcilable variable the script: + + 1. Sums current state-level DB targets to obtain the aggregate. + 2. Looks up the authoritative value from policyengine-us. + 3. Scales **all** geographic levels proportionally. + + Args: + session: Active database session. + target_year: Simulation year for the parameter lookup. + + Returns: + Mapping of variable name to the scale factor applied. + """ + authoritative = _get_authoritative_targets(target_year) + scale_factors: Dict[str, float] = {} + + for variable, auth_value in authoritative.items(): + state_sum, state_count = _compute_state_aggregate(session, variable) + + if state_sum == 0: + logger.warning( + "Skipping '%s': no state-level targets in database", + variable, + ) + continue + + scale = auth_value / state_sum + pct = (scale - 1) * 100 + + logger.info( + "Reconciling '%s': " + "%d state rows summing to $%.1fB -> $%.1fB " + "(x%.4f, %+.1f%%)", + variable, + state_count, + state_sum / 1e9, + auth_value / 1e9, + scale, + pct, + ) + + n = _scale_targets(session, variable, scale, target_year) + logger.info(" Updated %d target rows for '%s'", n, variable) + scale_factors[variable] = scale + + session.commit() + logger.info("Reconciliation complete.") + return scale_factors + + +def main() -> None: + db_url = f"sqlite:///{STORAGE_FOLDER / 'policy_data.db'}" + engine = create_engine(db_url) + + with Session(engine) as session: + reconcile_targets(session) + + +if __name__ == "__main__": + main() diff --git a/policyengine_us_data/db/validate_database.py b/policyengine_us_data/db/validate_database.py index fee6a49dc..53ac09852 100644 --- a/policyengine_us_data/db/validate_database.py +++ b/policyengine_us_data/db/validate_database.py @@ -9,7 +9,6 @@ import pandas as pd from policyengine_us.system import system - conn = sqlite3.connect("policyengine_us_data/storage/policy_data.db") stratum_constraints_df = pd.read_sql("SELECT * FROM stratum_constraints", conn) diff --git a/policyengine_us_data/storage/calibration_targets/pull_snap_targets.py b/policyengine_us_data/storage/calibration_targets/pull_snap_targets.py index 349e6fbdd..1830bdb3a 100644 --- a/policyengine_us_data/storage/calibration_targets/pull_snap_targets.py +++ b/policyengine_us_data/storage/calibration_targets/pull_snap_targets.py @@ -9,7 +9,6 @@ STATE_NAME_TO_ABBREV, ) - STATE_NAME_TO_FIPS = { "Alabama": "01", "Alaska": "02", diff --git a/policyengine_us_data/tests/test_datasets/test_county_fips.py b/policyengine_us_data/tests/test_datasets/test_county_fips.py index ad1f10c5c..d692cf559 100644 --- a/policyengine_us_data/tests/test_datasets/test_county_fips.py +++ b/policyengine_us_data/tests/test_datasets/test_county_fips.py @@ -10,7 +10,6 @@ LOCAL_FOLDER, ) - # Sample data that mimics the format from census.gov SAMPLE_CENSUS_DATA = """STATE|STATEFP|COUNTYFP|COUNTYNAME AL|01|001|Autauga County diff --git a/policyengine_us_data/tests/test_reconcile_targets.py b/policyengine_us_data/tests/test_reconcile_targets.py new file mode 100644 index 000000000..59df4a9ad --- /dev/null +++ b/policyengine_us_data/tests/test_reconcile_targets.py @@ -0,0 +1,168 @@ +"""Tests for the database target reconciliation step.""" + +import pytest +from sqlmodel import Session + +from policyengine_us_data.db.create_database_tables import ( + Stratum, + StratumConstraint, + Target, + create_database, +) +from policyengine_us_data.db.reconcile_targets import ( + _compute_state_aggregate, + _get_authoritative_targets, + _scale_targets, + reconcile_targets, + TARGET_YEAR, +) + + +@pytest.fixture +def engine(tmp_path): + db_uri = f"sqlite:///{tmp_path / 'test.db'}" + return create_database(db_uri) + + +def _make_stratum(session, ucgid, extra_constraints=None): + """Helper: create a stratum with a ucgid_str constraint.""" + s = Stratum(stratum_group_id=0, notes=f"Geo: {ucgid}") + constraints = [ + StratumConstraint( + constraint_variable="ucgid_str", + operation="in", + value=ucgid, + ) + ] + if extra_constraints: + constraints.extend(extra_constraints) + s.constraints_rel = constraints + session.add(s) + session.flush() + return s + + +def test_authoritative_targets_are_positive(): + """Smoke test: policyengine-us parameters return positive values.""" + targets = _get_authoritative_targets(TARGET_YEAR) + assert len(targets) >= 4 + for name, value in targets.items(): + assert value > 0, f"{name} should be positive, got {value}" + + +def test_compute_state_aggregate(engine): + """State-level targets are summed; national/district are excluded.""" + with Session(engine) as session: + # National target (should NOT count) + nat = _make_stratum(session, "0100000US") + nat.targets_rel = [ + Target( + variable="income_tax", + period=2022, + value=999e9, + active=True, + ) + ] + + # Two state targets (should count) + for fips, val in [("01", 50e9), ("06", 150e9)]: + st = _make_stratum(session, f"0400000US{fips}") + st.targets_rel = [ + Target( + variable="income_tax", + period=2022, + value=val, + active=True, + ) + ] + + # District target (should NOT count) + dist = _make_stratum(session, "5001800US0601") + dist.targets_rel = [ + Target( + variable="income_tax", + period=2022, + value=30e9, + active=True, + ) + ] + + session.commit() + + total, count = _compute_state_aggregate(session, "income_tax") + assert count == 2 + assert total == pytest.approx(200e9) + + +def test_scale_targets(engine): + """All targets for a variable are scaled and period updated.""" + with Session(engine) as session: + nat = _make_stratum(session, "0100000US") + nat.targets_rel = [ + Target( + variable="eitc", + period=2022, + value=100e9, + active=True, + ) + ] + st = _make_stratum(session, "0400000US01") + st.targets_rel = [ + Target( + variable="eitc", + period=2022, + value=40e9, + active=True, + ) + ] + session.commit() + + n = _scale_targets(session, "eitc", 1.5, 2024) + session.commit() + + assert n == 2 + for t in session.exec( + __import__("sqlmodel") + .select(Target) + .where(Target.variable == "eitc") + ).all(): + assert t.period == 2024 + assert t.value in [ + pytest.approx(150e9), + pytest.approx(60e9), + ] + + +def test_reconcile_targets_scales_correctly(engine): + """End-to-end: reconciliation scales DB to match parameters.""" + auth = _get_authoritative_targets(TARGET_YEAR) + income_tax_target = auth["income_tax"] + + with Session(engine) as session: + # Seed with a known state aggregate that differs from target + state_total = 0.0 + for fips, share in [ + ("01", 0.02), + ("06", 0.15), + ("48", 0.10), + ]: + # Use an intentionally stale value (half of target) + val = income_tax_target * share * 0.5 + state_total += val + st = _make_stratum(session, f"0400000US{fips}") + st.targets_rel = [ + Target( + variable="income_tax", + period=2022, + value=val, + active=True, + ) + ] + session.commit() + + factors = reconcile_targets(session, TARGET_YEAR) + + assert "income_tax" in factors + # After reconciliation state sum should match the target + new_total, _ = _compute_state_aggregate(session, "income_tax") + assert new_total == pytest.approx(income_tax_target, rel=1e-6) diff --git a/policyengine_us_data/utils/census.py b/policyengine_us_data/utils/census.py index 2f424ccb8..8081b6162 100644 --- a/policyengine_us_data/utils/census.py +++ b/policyengine_us_data/utils/census.py @@ -4,7 +4,6 @@ import pandas as pd import numpy as np - STATE_NAME_TO_FIPS = { "Alabama": "01", "Alaska": "02", diff --git a/policyengine_us_data/utils/huggingface.py b/policyengine_us_data/utils/huggingface.py index 7b8c07928..d9b2f8a51 100644 --- a/policyengine_us_data/utils/huggingface.py +++ b/policyengine_us_data/utils/huggingface.py @@ -1,7 +1,6 @@ from huggingface_hub import hf_hub_download, login, HfApi import os - TOKEN = os.environ.get("HUGGING_FACE_TOKEN") if not TOKEN: raise ValueError( diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index cbea6dabb..e368d5048 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -9,7 +9,6 @@ from policyengine_core.reforms import Reform from policyengine_us_data.utils.soi import pe_to_soi, get_soi - # CPS-derived statistics # Medical expenses, sum of spm thresholds # Child support expenses From 3a9a091a6ed2f7eb8c6381b848825c70b80bc827 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 1 Feb 2026 23:29:25 +0000 Subject: [PATCH 2/5] Expand reconciliation to all DB variables with 2024 parameters Extends the target mapping from 4 variables to 13, covering every IRS SOI ETL variable that has a 2024 equivalent in the policyengine-us calibration parameter tree: - CBO income_by_source: adjusted_gross_income, taxable_social_security, taxable_pension_income, net_capital_gain - IRS SOI: qualified_dividend_income, taxable_interest_income, tax_exempt_interest_income, partnership_s_corp_income (mapped from tax_unit_partnership_s_corp_income), dividend_income (sum of qualified + non_qualified) Test updated to assert all 13 variables are present and positive. https://claude.ai/code/session_01GisHzYtJZQQyUfVdRmWV2t --- policyengine_us_data/db/reconcile_targets.py | 37 +++++++++++++++++-- .../tests/test_reconcile_targets.py | 27 ++++++++++++-- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/policyengine_us_data/db/reconcile_targets.py b/policyengine_us_data/db/reconcile_targets.py index 6c94b6356..e30162a40 100644 --- a/policyengine_us_data/db/reconcile_targets.py +++ b/policyengine_us_data/db/reconcile_targets.py @@ -37,8 +37,10 @@ def _get_authoritative_targets(year: int) -> Dict[str, float]: """Read national targets from policyengine-us parameters. - Uses the same parameter paths as loss.py ``build_loss_matrix`` - to ensure the database stays consistent with calibration. + Covers every database variable for which a 2024 parameter exists + in the policyengine-us calibration tree. The dict keys are the + *database* variable names (as written by the ETL scripts); the + values are the authoritative national totals for *year*. Args: year: The simulation year. @@ -49,19 +51,46 @@ def _get_authoritative_targets(year: int) -> Dict[str, float]: p = system.parameters(year) cbo = p.calibration.gov.cbo + cbo_inc = cbo.income_by_source + soi = p.calibration.gov.irs.soi + targets: Dict[str, float] = { - # CBO budget projections (loss.py lines 201-220) + # ---- CBO budget projections ---- "income_tax": cbo._children["income_tax"], "snap": cbo._children["snap"], "unemployment_compensation": cbo._children[ "unemployment_compensation" ], - # Treasury EITC spending (loss.py lines 262-268) + # ---- Treasury ---- "eitc": ( system.parameters.calibration.gov.treasury.tax_expenditures.eitc( year ) ), + # ---- CBO income-by-source (DB vars from IRS SOI ETL) ---- + "adjusted_gross_income": cbo_inc._children["adjusted_gross_income"], + "taxable_social_security": cbo_inc._children[ + "taxable_social_security" + ], + "taxable_pension_income": cbo_inc._children["taxable_pension_income"], + "net_capital_gain": cbo_inc._children["net_capital_gain"], + # ---- IRS SOI calibration parameters ---- + "qualified_dividend_income": soi._children[ + "qualified_dividend_income" + ], + "taxable_interest_income": soi._children["taxable_interest_income"], + "tax_exempt_interest_income": soi._children[ + "tax_exempt_interest_income" + ], + # DB name differs from param name + "tax_unit_partnership_s_corp_income": soi._children[ + "partnership_s_corp_income" + ], + # ordinary dividends = qualified + non-qualified + "dividend_income": ( + soi._children["qualified_dividend_income"] + + soi._children["non_qualified_dividend_income"] + ), } return targets diff --git a/policyengine_us_data/tests/test_reconcile_targets.py b/policyengine_us_data/tests/test_reconcile_targets.py index 59df4a9ad..67da35643 100644 --- a/policyengine_us_data/tests/test_reconcile_targets.py +++ b/policyengine_us_data/tests/test_reconcile_targets.py @@ -42,12 +42,31 @@ def _make_stratum(session, ucgid, extra_constraints=None): return s +EXPECTED_VARIABLES = [ + "income_tax", + "snap", + "unemployment_compensation", + "eitc", + "adjusted_gross_income", + "taxable_social_security", + "taxable_pension_income", + "net_capital_gain", + "qualified_dividend_income", + "taxable_interest_income", + "tax_exempt_interest_income", + "tax_unit_partnership_s_corp_income", + "dividend_income", +] + + def test_authoritative_targets_are_positive(): - """Smoke test: policyengine-us parameters return positive values.""" + """All mapped variables return positive 2024 values.""" targets = _get_authoritative_targets(TARGET_YEAR) - assert len(targets) >= 4 - for name, value in targets.items(): - assert value > 0, f"{name} should be positive, got {value}" + for name in EXPECTED_VARIABLES: + assert name in targets, f"'{name}' missing from target map" + assert ( + targets[name] > 0 + ), f"{name} should be positive, got {targets[name]}" def test_compute_state_aggregate(engine): From 58414eb859100e554487e86499c8b652c463a758 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 1 Feb 2026 23:45:03 +0000 Subject: [PATCH 3/5] Disambiguate reconciliation targets by (variable, source_id) Variables like person_count appear in multiple ETL sources with different meanings (census age, medicaid enrollment, IRS SOI returns). The previous code filtered only by variable name, which would incorrectly mix targets from different sources. Now each target is keyed by (variable, source_id) and queries filter on both columns. Also adds reconciliation for person_count from all three sources: - source_id=1 (Census age) -> census.populations.total - source_id=2 (Medicaid) -> sum of state medicaid enrollment params - source_id=5 (IRS SOI) -> sum of returns by filing status Closes #503 https://claude.ai/code/session_01GisHzYtJZQQyUfVdRmWV2t --- policyengine_us_data/db/reconcile_targets.py | 147 +++++++++----- .../tests/test_reconcile_targets.py | 184 +++++++++++++++--- 2 files changed, 260 insertions(+), 71 deletions(-) diff --git a/policyengine_us_data/db/reconcile_targets.py b/policyengine_us_data/db/reconcile_targets.py index e30162a40..a3e263978 100644 --- a/policyengine_us_data/db/reconcile_targets.py +++ b/policyengine_us_data/db/reconcile_targets.py @@ -10,11 +10,16 @@ applying the same proportional adjustment at every geographic level (national, state, congressional district). +Each target is identified by both its variable name AND its source_id +to disambiguate cases like person_count, which appears in the age ETL +(source_id=1), medicaid ETL (source_id=2), and IRS SOI ETL +(source_id=5) with different meanings. + See: https://github.com/PolicyEngine/policyengine-us-data/issues/503 """ import logging -from typing import Dict, Optional, Tuple +from typing import Dict, Tuple from sqlalchemy import text from sqlmodel import Session, create_engine, select @@ -33,72 +38,111 @@ TARGET_YEAR = 2024 +# Type alias: (variable_name, source_id) +TargetKey = Tuple[str, int] + -def _get_authoritative_targets(year: int) -> Dict[str, float]: +def _get_authoritative_targets( + year: int, +) -> Dict[TargetKey, float]: """Read national targets from policyengine-us parameters. - Covers every database variable for which a 2024 parameter exists - in the policyengine-us calibration tree. The dict keys are the - *database* variable names (as written by the ETL scripts); the - values are the authoritative national totals for *year*. + Each target is keyed by ``(variable_name, source_id)`` to handle + variables like ``person_count`` that appear in multiple ETL + sources with different meanings. + + Source IDs follow the ETL convention: + 1 = Census age (etl_age) + 2 = Medicaid enrollment (etl_medicaid) + 3 = SNAP admin (etl_snap) + 5 = IRS SOI (etl_irs_soi) Args: year: The simulation year. Returns: - Mapping of database variable name to authoritative value. + Mapping of (variable, source_id) to authoritative value. """ p = system.parameters(year) - cbo = p.calibration.gov.cbo + cal = p.calibration.gov + cbo = cal.cbo cbo_inc = cbo.income_by_source - soi = p.calibration.gov.irs.soi + soi = cal.irs.soi + + # Medicaid enrollment: sum state-level values + medicaid_enrollment = sum( + v + for v in cal.hhs.medicaid.totals.enrollment._children.values() + if isinstance(v, (int, float)) + ) + + # IRS SOI total returns: sum by filing status + soi_total_returns = sum( + v + for v in soi.returns_by_filing_status._children.values() + if isinstance(v, (int, float)) + ) - targets: Dict[str, float] = { - # ---- CBO budget projections ---- - "income_tax": cbo._children["income_tax"], - "snap": cbo._children["snap"], - "unemployment_compensation": cbo._children[ + targets: Dict[TargetKey, float] = { + # ---- IRS SOI ETL (source_id=5) ---- + # CBO budget projections + ("income_tax", 5): cbo._children["income_tax"], + ("unemployment_compensation", 5): cbo._children[ "unemployment_compensation" ], - # ---- Treasury ---- - "eitc": ( + # Treasury + ("eitc", 5): ( system.parameters.calibration.gov.treasury.tax_expenditures.eitc( year ) ), - # ---- CBO income-by-source (DB vars from IRS SOI ETL) ---- - "adjusted_gross_income": cbo_inc._children["adjusted_gross_income"], - "taxable_social_security": cbo_inc._children[ + # CBO income-by-source + ("adjusted_gross_income", 5): cbo_inc._children[ + "adjusted_gross_income" + ], + ("taxable_social_security", 5): cbo_inc._children[ "taxable_social_security" ], - "taxable_pension_income": cbo_inc._children["taxable_pension_income"], - "net_capital_gain": cbo_inc._children["net_capital_gain"], - # ---- IRS SOI calibration parameters ---- - "qualified_dividend_income": soi._children[ + ("taxable_pension_income", 5): cbo_inc._children[ + "taxable_pension_income" + ], + ("net_capital_gain", 5): cbo_inc._children["net_capital_gain"], + # IRS SOI calibration parameters + ("qualified_dividend_income", 5): soi._children[ "qualified_dividend_income" ], - "taxable_interest_income": soi._children["taxable_interest_income"], - "tax_exempt_interest_income": soi._children[ + ("taxable_interest_income", 5): soi._children[ + "taxable_interest_income" + ], + ("tax_exempt_interest_income", 5): soi._children[ "tax_exempt_interest_income" ], # DB name differs from param name - "tax_unit_partnership_s_corp_income": soi._children[ + ("tax_unit_partnership_s_corp_income", 5): soi._children[ "partnership_s_corp_income" ], # ordinary dividends = qualified + non-qualified - "dividend_income": ( + ("dividend_income", 5): ( soi._children["qualified_dividend_income"] + soi._children["non_qualified_dividend_income"] ), + # IRS SOI person_count (total returns) + ("person_count", 5): soi_total_returns, + # ---- SNAP ETL (source_id=3) ---- + ("snap", 3): cbo._children["snap"], + # ---- Census Age ETL (source_id=1) ---- + ("person_count", 1): cal.census.populations._children["total"], + # ---- Medicaid ETL (source_id=2) ---- + ("person_count", 2): medicaid_enrollment, } return targets def _compute_state_aggregate( - session: Session, variable: str + session: Session, variable: str, source_id: int ) -> Tuple[float, int]: - """Sum state-level target values for a variable. + """Sum state-level target values for a variable and source. State strata are identified by a ``ucgid_str`` constraint whose value starts with ``0400000US`` (two-digit state FIPS). @@ -111,6 +155,7 @@ def _compute_state_aggregate( Args: session: Active database session. variable: Target variable name. + source_id: ETL source identifier to disambiguate targets. Returns: Tuple of (sum of state-level targets, count of rows). @@ -123,11 +168,12 @@ def _compute_state_aggregate( JOIN stratum_constraints sc ON sc.stratum_id = t.stratum_id WHERE t.variable = :variable + AND t.source_id = :source_id AND t.active = 1 AND sc.constraint_variable = 'ucgid_str' AND sc.value LIKE '0400000US%' """), - {"variable": variable}, + {"variable": variable, "source_id": source_id}, ) row = result.one() return float(row.total), int(row.cnt) @@ -136,10 +182,11 @@ def _compute_state_aggregate( def _scale_targets( session: Session, variable: str, + source_id: int, scale_factor: float, target_year: int, ) -> int: - """Multiply every target for *variable* by *scale_factor*. + """Multiply every target for *variable* / *source_id* by *scale*. Also updates the ``period`` column to *target_year* so the database reflects the simulation year rather than the original @@ -148,13 +195,18 @@ def _scale_targets( Args: session: Active database session. variable: Target variable name. + source_id: ETL source identifier to disambiguate targets. scale_factor: Multiplicative adjustment factor. target_year: New period value for updated rows. Returns: Number of target rows updated. """ - stmt = select(Target).where(Target.variable == variable) + stmt = ( + select(Target) + .where(Target.variable == variable) + .where(Target.source_id == source_id) + ) all_targets = session.exec(stmt).all() updated = 0 @@ -171,10 +223,10 @@ def _scale_targets( def reconcile_targets( session: Session, target_year: int = TARGET_YEAR, -) -> Dict[str, float]: +) -> Dict[TargetKey, float]: """Scale database targets to match policyengine-us parameters. - For each reconcilable variable the script: + For each reconcilable (variable, source_id) pair the script: 1. Sums current state-level DB targets to obtain the aggregate. 2. Looks up the authoritative value from policyengine-us. @@ -185,18 +237,22 @@ def reconcile_targets( target_year: Simulation year for the parameter lookup. Returns: - Mapping of variable name to the scale factor applied. + Mapping of (variable, source_id) to the scale factor applied. """ authoritative = _get_authoritative_targets(target_year) - scale_factors: Dict[str, float] = {} + scale_factors: Dict[TargetKey, float] = {} - for variable, auth_value in authoritative.items(): - state_sum, state_count = _compute_state_aggregate(session, variable) + for (variable, source_id), auth_value in authoritative.items(): + state_sum, state_count = _compute_state_aggregate( + session, variable, source_id + ) if state_sum == 0: logger.warning( - "Skipping '%s': no state-level targets in database", + "Skipping '%s' (source %d): " + "no state-level targets in database", variable, + source_id, ) continue @@ -204,20 +260,21 @@ def reconcile_targets( pct = (scale - 1) * 100 logger.info( - "Reconciling '%s': " - "%d state rows summing to $%.1fB -> $%.1fB " + "Reconciling '%s' (source %d): " + "%d state rows summing to %.3g -> %.3g " "(x%.4f, %+.1f%%)", variable, + source_id, state_count, - state_sum / 1e9, - auth_value / 1e9, + state_sum, + auth_value, scale, pct, ) - n = _scale_targets(session, variable, scale, target_year) + n = _scale_targets(session, variable, source_id, scale, target_year) logger.info(" Updated %d target rows for '%s'", n, variable) - scale_factors[variable] = scale + scale_factors[(variable, source_id)] = scale session.commit() logger.info("Reconciliation complete.") diff --git a/policyengine_us_data/tests/test_reconcile_targets.py b/policyengine_us_data/tests/test_reconcile_targets.py index 67da35643..9d0bddc24 100644 --- a/policyengine_us_data/tests/test_reconcile_targets.py +++ b/policyengine_us_data/tests/test_reconcile_targets.py @@ -42,35 +42,39 @@ def _make_stratum(session, ucgid, extra_constraints=None): return s -EXPECTED_VARIABLES = [ - "income_tax", - "snap", - "unemployment_compensation", - "eitc", - "adjusted_gross_income", - "taxable_social_security", - "taxable_pension_income", - "net_capital_gain", - "qualified_dividend_income", - "taxable_interest_income", - "tax_exempt_interest_income", - "tax_unit_partnership_s_corp_income", - "dividend_income", +# Expected (variable, source_id) pairs in the authoritative map. +EXPECTED_TARGETS = [ + ("income_tax", 5), + ("snap", 3), + ("unemployment_compensation", 5), + ("eitc", 5), + ("adjusted_gross_income", 5), + ("taxable_social_security", 5), + ("taxable_pension_income", 5), + ("net_capital_gain", 5), + ("qualified_dividend_income", 5), + ("taxable_interest_income", 5), + ("tax_exempt_interest_income", 5), + ("tax_unit_partnership_s_corp_income", 5), + ("dividend_income", 5), + ("person_count", 1), + ("person_count", 2), + ("person_count", 5), ] def test_authoritative_targets_are_positive(): - """All mapped variables return positive 2024 values.""" + """All mapped (variable, source_id) pairs return positive values.""" targets = _get_authoritative_targets(TARGET_YEAR) - for name in EXPECTED_VARIABLES: - assert name in targets, f"'{name}' missing from target map" + for key in EXPECTED_TARGETS: + assert key in targets, f"{key} missing from target map" assert ( - targets[name] > 0 - ), f"{name} should be positive, got {targets[name]}" + targets[key] > 0 + ), f"{key} should be positive, got {targets[key]}" def test_compute_state_aggregate(engine): - """State-level targets are summed; national/district are excluded.""" + """State-level targets are summed; national/district excluded.""" with Session(engine) as session: # National target (should NOT count) nat = _make_stratum(session, "0100000US") @@ -79,6 +83,7 @@ def test_compute_state_aggregate(engine): variable="income_tax", period=2022, value=999e9, + source_id=5, active=True, ) ] @@ -91,6 +96,7 @@ def test_compute_state_aggregate(engine): variable="income_tax", period=2022, value=val, + source_id=5, active=True, ) ] @@ -102,19 +108,82 @@ def test_compute_state_aggregate(engine): variable="income_tax", period=2022, value=30e9, + source_id=5, active=True, ) ] session.commit() - total, count = _compute_state_aggregate(session, "income_tax") + total, count = _compute_state_aggregate( + session, "income_tax", source_id=5 + ) assert count == 2 assert total == pytest.approx(200e9) +def test_compute_state_aggregate_filters_by_source(engine): + """Same variable with different source_ids are counted separately.""" + with Session(engine) as session: + # Use different states to avoid definition_hash collision. + # person_count from census age ETL (source_id=1) + st1 = _make_stratum(session, "0400000US01") + st1.targets_rel = [ + Target( + variable="person_count", + period=2023, + value=5_000_000, + source_id=1, + active=True, + ) + ] + + # person_count from medicaid ETL (source_id=2) + st2 = _make_stratum(session, "0400000US06") + st2.targets_rel = [ + Target( + variable="person_count", + period=2023, + value=1_200_000, + source_id=2, + active=True, + ) + ] + + # person_count from IRS SOI ETL (source_id=5) + st3 = _make_stratum(session, "0400000US48") + st3.targets_rel = [ + Target( + variable="person_count", + period=2022, + value=2_500_000, + source_id=5, + active=True, + ) + ] + + session.commit() + + total_age, count_age = _compute_state_aggregate( + session, "person_count", source_id=1 + ) + total_med, count_med = _compute_state_aggregate( + session, "person_count", source_id=2 + ) + total_soi, count_soi = _compute_state_aggregate( + session, "person_count", source_id=5 + ) + + assert count_age == 1 + assert total_age == pytest.approx(5_000_000) + assert count_med == 1 + assert total_med == pytest.approx(1_200_000) + assert count_soi == 1 + assert total_soi == pytest.approx(2_500_000) + + def test_scale_targets(engine): - """All targets for a variable are scaled and period updated.""" + """Targets for a variable+source are scaled; period updated.""" with Session(engine) as session: nat = _make_stratum(session, "0100000US") nat.targets_rel = [ @@ -122,6 +191,7 @@ def test_scale_targets(engine): variable="eitc", period=2022, value=100e9, + source_id=5, active=True, ) ] @@ -131,12 +201,19 @@ def test_scale_targets(engine): variable="eitc", period=2022, value=40e9, + source_id=5, active=True, ) ] session.commit() - n = _scale_targets(session, "eitc", 1.5, 2024) + n = _scale_targets( + session, + "eitc", + source_id=5, + scale_factor=1.5, + target_year=2024, + ) session.commit() assert n == 2 @@ -152,10 +229,62 @@ def test_scale_targets(engine): ] +def test_scale_targets_isolates_sources(engine): + """Scaling one source_id does not touch another source_id.""" + with Session(engine) as session: + # Use separate strata to avoid unique-constraint collision + # on (variable, period, stratum_id, reform_id). + st1 = _make_stratum(session, "0400000US06") + st1.targets_rel = [ + Target( + variable="person_count", + period=2023, + value=39_000_000, + source_id=1, + active=True, + ), + ] + st2 = _make_stratum(session, "0400000US48") + st2.targets_rel = [ + Target( + variable="person_count", + period=2023, + value=14_000_000, + source_id=2, + active=True, + ), + ] + session.commit() + + # Scale only census age targets (source_id=1) + _scale_targets( + session, + "person_count", + source_id=1, + scale_factor=1.1, + target_year=2024, + ) + session.commit() + + targets = session.exec( + __import__("sqlmodel") + .select(Target) + .where(Target.variable == "person_count") + ).all() + + by_source = {t.source_id: t for t in targets} + # source_id=1 was scaled + assert by_source[1].value == pytest.approx(39_000_000 * 1.1) + assert by_source[1].period == 2024 + # source_id=2 was NOT touched + assert by_source[2].value == pytest.approx(14_000_000) + assert by_source[2].period == 2023 + + def test_reconcile_targets_scales_correctly(engine): """End-to-end: reconciliation scales DB to match parameters.""" auth = _get_authoritative_targets(TARGET_YEAR) - income_tax_target = auth["income_tax"] + income_tax_target = auth[("income_tax", 5)] with Session(engine) as session: # Seed with a known state aggregate that differs from target @@ -174,6 +303,7 @@ def test_reconcile_targets_scales_correctly(engine): variable="income_tax", period=2022, value=val, + source_id=5, active=True, ) ] @@ -181,7 +311,9 @@ def test_reconcile_targets_scales_correctly(engine): factors = reconcile_targets(session, TARGET_YEAR) - assert "income_tax" in factors + assert ("income_tax", 5) in factors # After reconciliation state sum should match the target - new_total, _ = _compute_state_aggregate(session, "income_tax") + new_total, _ = _compute_state_aggregate( + session, "income_tax", source_id=5 + ) assert new_total == pytest.approx(income_tax_target, rel=1e-6) From 79e3444eca7b28374afd63ae54e70786280982e0 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Feb 2026 00:33:22 +0000 Subject: [PATCH 4/5] Update ETL scripts to 2024 data, remove parameter-based reconciliation Instead of reconciling stale DB targets against policyengine-us parameters, update the ETL scripts to pull 2024 data directly from their administrative sources: - etl_age.py: Census ACS 2023 -> 2024 (confirmed available) - etl_medicaid.py: Medicaid.gov 2023 -> 2024 (confirmed available) - etl_snap.py: USDA SNAP FY2023 -> FY2024 (confirmed available) - etl_irs_soi.py: stays at 2022 (2023/2024 not yet published by IRS) Removes reconcile_targets.py and its tests, which scaled DB targets using policyengine-us parameters. The DB ETL should pull directly from administrative sources rather than going through policyengine-us as an intermediary. Closes #503 https://claude.ai/code/session_01GisHzYtJZQQyUfVdRmWV2t --- Makefile | 1 - policyengine_us_data/db/etl_age.py | 2 +- policyengine_us_data/db/etl_medicaid.py | 2 +- policyengine_us_data/db/etl_snap.py | 4 +- policyengine_us_data/db/reconcile_targets.py | 293 ---------------- .../tests/test_reconcile_targets.py | 319 ------------------ 6 files changed, 4 insertions(+), 617 deletions(-) delete mode 100644 policyengine_us_data/db/reconcile_targets.py delete mode 100644 policyengine_us_data/tests/test_reconcile_targets.py diff --git a/Makefile b/Makefile index 8f5ad180e..b03e23d55 100644 --- a/Makefile +++ b/Makefile @@ -60,7 +60,6 @@ database: python policyengine_us_data/db/etl_medicaid.py python policyengine_us_data/db/etl_snap.py python policyengine_us_data/db/etl_irs_soi.py - python policyengine_us_data/db/reconcile_targets.py python policyengine_us_data/db/validate_database.py data: diff --git a/policyengine_us_data/db/etl_age.py b/policyengine_us_data/db/etl_age.py index d80faf065..2f8b4b3df 100644 --- a/policyengine_us_data/db/etl_age.py +++ b/policyengine_us_data/db/etl_age.py @@ -184,7 +184,7 @@ def load_age_data(df_long, geo, year, stratum_lookup=None): if __name__ == "__main__": # --- ETL: Extract, Transform, Load ---- - year = 2023 + year = 2024 # ---- Extract ---------- docs = get_census_docs(year) diff --git a/policyengine_us_data/db/etl_medicaid.py b/policyengine_us_data/db/etl_medicaid.py index 926a0d88c..e3d46d5c7 100644 --- a/policyengine_us_data/db/etl_medicaid.py +++ b/policyengine_us_data/db/etl_medicaid.py @@ -194,7 +194,7 @@ def load_medicaid_data(long_state, long_cd, year): if __name__ == "__main__": - year = 2023 + year = 2024 # Extract ------------------------------ cd_survey_df, state_admin_df = extract_medicaid_data(year) diff --git a/policyengine_us_data/db/etl_snap.py b/policyengine_us_data/db/etl_snap.py index 1fba44a46..a39411ef5 100644 --- a/policyengine_us_data/db/etl_snap.py +++ b/policyengine_us_data/db/etl_snap.py @@ -20,7 +20,7 @@ ) -def extract_administrative_snap_data(year=2023): +def extract_administrative_snap_data(year=2024): """ Downloads and extracts annual state-level SNAP data from the USDA FNS zip file. """ @@ -280,7 +280,7 @@ def load_survey_snap_data(survey_df, year, stratum_lookup=None): def main(): - year = 2023 + year = 2024 # Extract --------- zip_file_admin = extract_administrative_snap_data() diff --git a/policyengine_us_data/db/reconcile_targets.py b/policyengine_us_data/db/reconcile_targets.py deleted file mode 100644 index a3e263978..000000000 --- a/policyengine_us_data/db/reconcile_targets.py +++ /dev/null @@ -1,293 +0,0 @@ -"""Reconcile stale database targets with policyengine-us parameters. - -After the ETL scripts populate policy_data.db with historical -administrative data (IRS SOI 2022, USDA SNAP FY2023), the aggregate -totals drift from the projections that loss.py uses for national -calibration. - -This script scales affected targets so that their national aggregates -match the policyengine-us parameter values for the simulation year, -applying the same proportional adjustment at every geographic level -(national, state, congressional district). - -Each target is identified by both its variable name AND its source_id -to disambiguate cases like person_count, which appears in the age ETL -(source_id=1), medicaid ETL (source_id=2), and IRS SOI ETL -(source_id=5) with different meanings. - -See: https://github.com/PolicyEngine/policyengine-us-data/issues/503 -""" - -import logging -from typing import Dict, Tuple - -from sqlalchemy import text -from sqlmodel import Session, create_engine, select - -from policyengine_us.system import system -from policyengine_us_data.storage import STORAGE_FOLDER -from policyengine_us_data.db.create_database_tables import ( - Target, -) - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - -TARGET_YEAR = 2024 - -# Type alias: (variable_name, source_id) -TargetKey = Tuple[str, int] - - -def _get_authoritative_targets( - year: int, -) -> Dict[TargetKey, float]: - """Read national targets from policyengine-us parameters. - - Each target is keyed by ``(variable_name, source_id)`` to handle - variables like ``person_count`` that appear in multiple ETL - sources with different meanings. - - Source IDs follow the ETL convention: - 1 = Census age (etl_age) - 2 = Medicaid enrollment (etl_medicaid) - 3 = SNAP admin (etl_snap) - 5 = IRS SOI (etl_irs_soi) - - Args: - year: The simulation year. - - Returns: - Mapping of (variable, source_id) to authoritative value. - """ - p = system.parameters(year) - - cal = p.calibration.gov - cbo = cal.cbo - cbo_inc = cbo.income_by_source - soi = cal.irs.soi - - # Medicaid enrollment: sum state-level values - medicaid_enrollment = sum( - v - for v in cal.hhs.medicaid.totals.enrollment._children.values() - if isinstance(v, (int, float)) - ) - - # IRS SOI total returns: sum by filing status - soi_total_returns = sum( - v - for v in soi.returns_by_filing_status._children.values() - if isinstance(v, (int, float)) - ) - - targets: Dict[TargetKey, float] = { - # ---- IRS SOI ETL (source_id=5) ---- - # CBO budget projections - ("income_tax", 5): cbo._children["income_tax"], - ("unemployment_compensation", 5): cbo._children[ - "unemployment_compensation" - ], - # Treasury - ("eitc", 5): ( - system.parameters.calibration.gov.treasury.tax_expenditures.eitc( - year - ) - ), - # CBO income-by-source - ("adjusted_gross_income", 5): cbo_inc._children[ - "adjusted_gross_income" - ], - ("taxable_social_security", 5): cbo_inc._children[ - "taxable_social_security" - ], - ("taxable_pension_income", 5): cbo_inc._children[ - "taxable_pension_income" - ], - ("net_capital_gain", 5): cbo_inc._children["net_capital_gain"], - # IRS SOI calibration parameters - ("qualified_dividend_income", 5): soi._children[ - "qualified_dividend_income" - ], - ("taxable_interest_income", 5): soi._children[ - "taxable_interest_income" - ], - ("tax_exempt_interest_income", 5): soi._children[ - "tax_exempt_interest_income" - ], - # DB name differs from param name - ("tax_unit_partnership_s_corp_income", 5): soi._children[ - "partnership_s_corp_income" - ], - # ordinary dividends = qualified + non-qualified - ("dividend_income", 5): ( - soi._children["qualified_dividend_income"] - + soi._children["non_qualified_dividend_income"] - ), - # IRS SOI person_count (total returns) - ("person_count", 5): soi_total_returns, - # ---- SNAP ETL (source_id=3) ---- - ("snap", 3): cbo._children["snap"], - # ---- Census Age ETL (source_id=1) ---- - ("person_count", 1): cal.census.populations._children["total"], - # ---- Medicaid ETL (source_id=2) ---- - ("person_count", 2): medicaid_enrollment, - } - return targets - - -def _compute_state_aggregate( - session: Session, variable: str, source_id: int -) -> Tuple[float, int]: - """Sum state-level target values for a variable and source. - - State strata are identified by a ``ucgid_str`` constraint whose - value starts with ``0400000US`` (two-digit state FIPS). - - A raw SQL join is used instead of the ORM because the - ``constraint_variable`` column stores values (like ``ucgid_str``) - that fall outside the ``USVariable`` enum, causing SQLAlchemy - deserialization errors if read through the model. - - Args: - session: Active database session. - variable: Target variable name. - source_id: ETL source identifier to disambiguate targets. - - Returns: - Tuple of (sum of state-level targets, count of rows). - """ - result = session.execute( - text(""" - SELECT COALESCE(SUM(t.value), 0) AS total, - COUNT(*) AS cnt - FROM targets t - JOIN stratum_constraints sc - ON sc.stratum_id = t.stratum_id - WHERE t.variable = :variable - AND t.source_id = :source_id - AND t.active = 1 - AND sc.constraint_variable = 'ucgid_str' - AND sc.value LIKE '0400000US%' - """), - {"variable": variable, "source_id": source_id}, - ) - row = result.one() - return float(row.total), int(row.cnt) - - -def _scale_targets( - session: Session, - variable: str, - source_id: int, - scale_factor: float, - target_year: int, -) -> int: - """Multiply every target for *variable* / *source_id* by *scale*. - - Also updates the ``period`` column to *target_year* so the - database reflects the simulation year rather than the original - source year. - - Args: - session: Active database session. - variable: Target variable name. - source_id: ETL source identifier to disambiguate targets. - scale_factor: Multiplicative adjustment factor. - target_year: New period value for updated rows. - - Returns: - Number of target rows updated. - """ - stmt = ( - select(Target) - .where(Target.variable == variable) - .where(Target.source_id == source_id) - ) - all_targets = session.exec(stmt).all() - - updated = 0 - for t in all_targets: - if t.value is not None: - t.value *= scale_factor - t.period = target_year - session.add(t) - updated += 1 - - return updated - - -def reconcile_targets( - session: Session, - target_year: int = TARGET_YEAR, -) -> Dict[TargetKey, float]: - """Scale database targets to match policyengine-us parameters. - - For each reconcilable (variable, source_id) pair the script: - - 1. Sums current state-level DB targets to obtain the aggregate. - 2. Looks up the authoritative value from policyengine-us. - 3. Scales **all** geographic levels proportionally. - - Args: - session: Active database session. - target_year: Simulation year for the parameter lookup. - - Returns: - Mapping of (variable, source_id) to the scale factor applied. - """ - authoritative = _get_authoritative_targets(target_year) - scale_factors: Dict[TargetKey, float] = {} - - for (variable, source_id), auth_value in authoritative.items(): - state_sum, state_count = _compute_state_aggregate( - session, variable, source_id - ) - - if state_sum == 0: - logger.warning( - "Skipping '%s' (source %d): " - "no state-level targets in database", - variable, - source_id, - ) - continue - - scale = auth_value / state_sum - pct = (scale - 1) * 100 - - logger.info( - "Reconciling '%s' (source %d): " - "%d state rows summing to %.3g -> %.3g " - "(x%.4f, %+.1f%%)", - variable, - source_id, - state_count, - state_sum, - auth_value, - scale, - pct, - ) - - n = _scale_targets(session, variable, source_id, scale, target_year) - logger.info(" Updated %d target rows for '%s'", n, variable) - scale_factors[(variable, source_id)] = scale - - session.commit() - logger.info("Reconciliation complete.") - return scale_factors - - -def main() -> None: - db_url = f"sqlite:///{STORAGE_FOLDER / 'policy_data.db'}" - engine = create_engine(db_url) - - with Session(engine) as session: - reconcile_targets(session) - - -if __name__ == "__main__": - main() diff --git a/policyengine_us_data/tests/test_reconcile_targets.py b/policyengine_us_data/tests/test_reconcile_targets.py deleted file mode 100644 index 9d0bddc24..000000000 --- a/policyengine_us_data/tests/test_reconcile_targets.py +++ /dev/null @@ -1,319 +0,0 @@ -"""Tests for the database target reconciliation step.""" - -import pytest -from sqlmodel import Session - -from policyengine_us_data.db.create_database_tables import ( - Stratum, - StratumConstraint, - Target, - create_database, -) -from policyengine_us_data.db.reconcile_targets import ( - _compute_state_aggregate, - _get_authoritative_targets, - _scale_targets, - reconcile_targets, - TARGET_YEAR, -) - - -@pytest.fixture -def engine(tmp_path): - db_uri = f"sqlite:///{tmp_path / 'test.db'}" - return create_database(db_uri) - - -def _make_stratum(session, ucgid, extra_constraints=None): - """Helper: create a stratum with a ucgid_str constraint.""" - s = Stratum(stratum_group_id=0, notes=f"Geo: {ucgid}") - constraints = [ - StratumConstraint( - constraint_variable="ucgid_str", - operation="in", - value=ucgid, - ) - ] - if extra_constraints: - constraints.extend(extra_constraints) - s.constraints_rel = constraints - session.add(s) - session.flush() - return s - - -# Expected (variable, source_id) pairs in the authoritative map. -EXPECTED_TARGETS = [ - ("income_tax", 5), - ("snap", 3), - ("unemployment_compensation", 5), - ("eitc", 5), - ("adjusted_gross_income", 5), - ("taxable_social_security", 5), - ("taxable_pension_income", 5), - ("net_capital_gain", 5), - ("qualified_dividend_income", 5), - ("taxable_interest_income", 5), - ("tax_exempt_interest_income", 5), - ("tax_unit_partnership_s_corp_income", 5), - ("dividend_income", 5), - ("person_count", 1), - ("person_count", 2), - ("person_count", 5), -] - - -def test_authoritative_targets_are_positive(): - """All mapped (variable, source_id) pairs return positive values.""" - targets = _get_authoritative_targets(TARGET_YEAR) - for key in EXPECTED_TARGETS: - assert key in targets, f"{key} missing from target map" - assert ( - targets[key] > 0 - ), f"{key} should be positive, got {targets[key]}" - - -def test_compute_state_aggregate(engine): - """State-level targets are summed; national/district excluded.""" - with Session(engine) as session: - # National target (should NOT count) - nat = _make_stratum(session, "0100000US") - nat.targets_rel = [ - Target( - variable="income_tax", - period=2022, - value=999e9, - source_id=5, - active=True, - ) - ] - - # Two state targets (should count) - for fips, val in [("01", 50e9), ("06", 150e9)]: - st = _make_stratum(session, f"0400000US{fips}") - st.targets_rel = [ - Target( - variable="income_tax", - period=2022, - value=val, - source_id=5, - active=True, - ) - ] - - # District target (should NOT count) - dist = _make_stratum(session, "5001800US0601") - dist.targets_rel = [ - Target( - variable="income_tax", - period=2022, - value=30e9, - source_id=5, - active=True, - ) - ] - - session.commit() - - total, count = _compute_state_aggregate( - session, "income_tax", source_id=5 - ) - assert count == 2 - assert total == pytest.approx(200e9) - - -def test_compute_state_aggregate_filters_by_source(engine): - """Same variable with different source_ids are counted separately.""" - with Session(engine) as session: - # Use different states to avoid definition_hash collision. - # person_count from census age ETL (source_id=1) - st1 = _make_stratum(session, "0400000US01") - st1.targets_rel = [ - Target( - variable="person_count", - period=2023, - value=5_000_000, - source_id=1, - active=True, - ) - ] - - # person_count from medicaid ETL (source_id=2) - st2 = _make_stratum(session, "0400000US06") - st2.targets_rel = [ - Target( - variable="person_count", - period=2023, - value=1_200_000, - source_id=2, - active=True, - ) - ] - - # person_count from IRS SOI ETL (source_id=5) - st3 = _make_stratum(session, "0400000US48") - st3.targets_rel = [ - Target( - variable="person_count", - period=2022, - value=2_500_000, - source_id=5, - active=True, - ) - ] - - session.commit() - - total_age, count_age = _compute_state_aggregate( - session, "person_count", source_id=1 - ) - total_med, count_med = _compute_state_aggregate( - session, "person_count", source_id=2 - ) - total_soi, count_soi = _compute_state_aggregate( - session, "person_count", source_id=5 - ) - - assert count_age == 1 - assert total_age == pytest.approx(5_000_000) - assert count_med == 1 - assert total_med == pytest.approx(1_200_000) - assert count_soi == 1 - assert total_soi == pytest.approx(2_500_000) - - -def test_scale_targets(engine): - """Targets for a variable+source are scaled; period updated.""" - with Session(engine) as session: - nat = _make_stratum(session, "0100000US") - nat.targets_rel = [ - Target( - variable="eitc", - period=2022, - value=100e9, - source_id=5, - active=True, - ) - ] - st = _make_stratum(session, "0400000US01") - st.targets_rel = [ - Target( - variable="eitc", - period=2022, - value=40e9, - source_id=5, - active=True, - ) - ] - session.commit() - - n = _scale_targets( - session, - "eitc", - source_id=5, - scale_factor=1.5, - target_year=2024, - ) - session.commit() - - assert n == 2 - for t in session.exec( - __import__("sqlmodel") - .select(Target) - .where(Target.variable == "eitc") - ).all(): - assert t.period == 2024 - assert t.value in [ - pytest.approx(150e9), - pytest.approx(60e9), - ] - - -def test_scale_targets_isolates_sources(engine): - """Scaling one source_id does not touch another source_id.""" - with Session(engine) as session: - # Use separate strata to avoid unique-constraint collision - # on (variable, period, stratum_id, reform_id). - st1 = _make_stratum(session, "0400000US06") - st1.targets_rel = [ - Target( - variable="person_count", - period=2023, - value=39_000_000, - source_id=1, - active=True, - ), - ] - st2 = _make_stratum(session, "0400000US48") - st2.targets_rel = [ - Target( - variable="person_count", - period=2023, - value=14_000_000, - source_id=2, - active=True, - ), - ] - session.commit() - - # Scale only census age targets (source_id=1) - _scale_targets( - session, - "person_count", - source_id=1, - scale_factor=1.1, - target_year=2024, - ) - session.commit() - - targets = session.exec( - __import__("sqlmodel") - .select(Target) - .where(Target.variable == "person_count") - ).all() - - by_source = {t.source_id: t for t in targets} - # source_id=1 was scaled - assert by_source[1].value == pytest.approx(39_000_000 * 1.1) - assert by_source[1].period == 2024 - # source_id=2 was NOT touched - assert by_source[2].value == pytest.approx(14_000_000) - assert by_source[2].period == 2023 - - -def test_reconcile_targets_scales_correctly(engine): - """End-to-end: reconciliation scales DB to match parameters.""" - auth = _get_authoritative_targets(TARGET_YEAR) - income_tax_target = auth[("income_tax", 5)] - - with Session(engine) as session: - # Seed with a known state aggregate that differs from target - state_total = 0.0 - for fips, share in [ - ("01", 0.02), - ("06", 0.15), - ("48", 0.10), - ]: - # Use an intentionally stale value (half of target) - val = income_tax_target * share * 0.5 - state_total += val - st = _make_stratum(session, f"0400000US{fips}") - st.targets_rel = [ - Target( - variable="income_tax", - period=2022, - value=val, - source_id=5, - active=True, - ) - ] - session.commit() - - factors = reconcile_targets(session, TARGET_YEAR) - - assert ("income_tax", 5) in factors - # After reconciliation state sum should match the target - new_total, _ = _compute_state_aggregate( - session, "income_tax", source_id=5 - ) - assert new_total == pytest.approx(income_tax_target, rel=1e-6) From 77fac1fe37e379dc6ee17897ec96492ff9e598b1 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 2 Feb 2026 13:36:04 +0000 Subject: [PATCH 5/5] Scale IRS SOI 2022 targets to 2024 using CBO projections IRS SOI congressional district data is only available through 2022 (23incd.csv not yet published). To bring these targets to the 2024 simulation year, scale them using CBO/Treasury projections -- the same approach the enhanced CPS calibration (loss.py) uses. Covers: income_tax, unemployment_compensation, eitc, AGI, taxable social security, pensions, capital gains, dividends, interest, partnership/S-corp income, and return counts (person_count). Census age, Medicaid, and SNAP targets are unaffected -- those ETLs already pull 2024 data directly from their administrative sources. https://claude.ai/code/session_01GisHzYtJZQQyUfVdRmWV2t --- Makefile | 1 + .../db/scale_irs_soi_to_cbo.py | 230 ++++++++++++++++++ .../tests/test_scale_irs_soi_to_cbo.py | 137 +++++++++++ 3 files changed, 368 insertions(+) create mode 100644 policyengine_us_data/db/scale_irs_soi_to_cbo.py create mode 100644 policyengine_us_data/tests/test_scale_irs_soi_to_cbo.py diff --git a/Makefile b/Makefile index b03e23d55..47e3c5a74 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,7 @@ database: python policyengine_us_data/db/etl_medicaid.py python policyengine_us_data/db/etl_snap.py python policyengine_us_data/db/etl_irs_soi.py + python policyengine_us_data/db/scale_irs_soi_to_cbo.py python policyengine_us_data/db/validate_database.py data: diff --git a/policyengine_us_data/db/scale_irs_soi_to_cbo.py b/policyengine_us_data/db/scale_irs_soi_to_cbo.py new file mode 100644 index 000000000..6d8cdccc9 --- /dev/null +++ b/policyengine_us_data/db/scale_irs_soi_to_cbo.py @@ -0,0 +1,230 @@ +"""Scale IRS SOI 2022 database targets to 2024 using CBO projections. + +The IRS SOI congressional-district data (22incd.csv) is the most recent +available; 2023/2024 CD data has not been published yet. To align the +DB targets with the 2024 simulation year we scale each variable's +aggregate to match the corresponding CBO / Treasury projection, using +the same parameter values that the enhanced CPS calibration uses in +loss.py. + +Only targets with source_id=5 (IRS SOI ETL) are affected. Census, +Medicaid, and SNAP targets already pull 2024 data directly from their +administrative sources. + +See: https://github.com/PolicyEngine/policyengine-us-data/issues/503 +""" + +import logging +from typing import Dict, Tuple + +from sqlalchemy import text +from sqlmodel import Session, create_engine, select + +from policyengine_us.system import system +from policyengine_us_data.storage import STORAGE_FOLDER +from policyengine_us_data.db.create_database_tables import Target + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +SOI_SOURCE_ID = 5 +SOI_YEAR = 2022 +TARGET_YEAR = 2024 + + +def _get_cbo_targets(year: int) -> Dict[str, float]: + """CBO / Treasury national totals for IRS SOI variables. + + Mirrors the targets used in loss.py for enhanced CPS calibration. + + Args: + year: Simulation year. + + Returns: + Mapping of DB variable name to national total. + """ + p = system.parameters(year) + cbo = p.calibration.gov.cbo + cbo_inc = cbo.income_by_source + soi = p.calibration.gov.irs.soi + + # IRS SOI total returns (sum by filing status) + soi_total_returns = sum( + v + for v in soi.returns_by_filing_status._children.values() + if isinstance(v, (int, float)) + ) + + return { + # CBO budget projections + "income_tax": cbo._children["income_tax"], + "unemployment_compensation": cbo._children[ + "unemployment_compensation" + ], + # Treasury + "eitc": ( + system.parameters.calibration.gov.treasury.tax_expenditures.eitc( + year + ) + ), + # CBO income-by-source + "adjusted_gross_income": cbo_inc._children["adjusted_gross_income"], + "taxable_social_security": cbo_inc._children[ + "taxable_social_security" + ], + "taxable_pension_income": cbo_inc._children["taxable_pension_income"], + "net_capital_gain": cbo_inc._children["net_capital_gain"], + # IRS SOI projections + "qualified_dividend_income": soi._children[ + "qualified_dividend_income" + ], + "taxable_interest_income": soi._children["taxable_interest_income"], + "tax_exempt_interest_income": soi._children[ + "tax_exempt_interest_income" + ], + "tax_unit_partnership_s_corp_income": soi._children[ + "partnership_s_corp_income" + ], + "dividend_income": ( + soi._children["qualified_dividend_income"] + + soi._children["non_qualified_dividend_income"] + ), + # Return counts + "person_count": soi_total_returns, + } + + +def _compute_state_aggregate( + session: Session, variable: str +) -> Tuple[float, int]: + """Sum state-level IRS SOI targets for *variable*. + + Uses raw SQL to avoid the USVariable enum deserialization issue. + + Args: + session: Active database session. + variable: Target variable name. + + Returns: + (sum of state-level values, row count). + """ + result = session.execute( + text(""" + SELECT COALESCE(SUM(t.value), 0) AS total, + COUNT(*) AS cnt + FROM targets t + JOIN stratum_constraints sc + ON sc.stratum_id = t.stratum_id + WHERE t.variable = :variable + AND t.source_id = :source_id + AND t.active = 1 + AND sc.constraint_variable = 'ucgid_str' + AND sc.value LIKE '0400000US%' + """), + {"variable": variable, "source_id": SOI_SOURCE_ID}, + ) + row = result.one() + return float(row.total), int(row.cnt) + + +def _scale_targets( + session: Session, + variable: str, + scale_factor: float, + target_year: int, +) -> int: + """Scale all IRS SOI targets for *variable* and update period. + + Args: + session: Active database session. + variable: Target variable name. + scale_factor: Multiplicative adjustment. + target_year: New period value. + + Returns: + Number of rows updated. + """ + stmt = ( + select(Target) + .where(Target.variable == variable) + .where(Target.source_id == SOI_SOURCE_ID) + ) + all_targets = session.exec(stmt).all() + + updated = 0 + for t in all_targets: + if t.value is not None: + t.value *= scale_factor + t.period = target_year + session.add(t) + updated += 1 + + return updated + + +def scale_soi_to_cbo( + session: Session, + target_year: int = TARGET_YEAR, +) -> Dict[str, float]: + """Scale IRS SOI DB targets to CBO 2024 projections. + + For each variable with a CBO/Treasury projection: + 1. Sum current state-level DB targets. + 2. Compute scale factor = CBO target / DB aggregate. + 3. Apply proportionally to all geographic levels. + + Args: + session: Active database session. + target_year: Simulation year. + + Returns: + Mapping of variable name to scale factor applied. + """ + cbo_targets = _get_cbo_targets(target_year) + scale_factors: Dict[str, float] = {} + + for variable, cbo_value in cbo_targets.items(): + state_sum, state_count = _compute_state_aggregate(session, variable) + + if state_sum == 0: + logger.warning( + "Skipping '%s': no state-level SOI targets", + variable, + ) + continue + + scale = cbo_value / state_sum + pct = (scale - 1) * 100 + + logger.info( + "%-35s %4d states %.3g -> %.3g " "(x%.4f, %+.1f%%)", + variable, + state_count, + state_sum, + cbo_value, + scale, + pct, + ) + + n = _scale_targets(session, variable, scale, target_year) + logger.info(" Updated %d rows", n) + scale_factors[variable] = scale + + session.commit() + logger.info("IRS SOI -> CBO scaling complete.") + return scale_factors + + +def main() -> None: + db_url = f"sqlite:///{STORAGE_FOLDER / 'policy_data.db'}" + engine = create_engine(db_url) + + with Session(engine) as session: + scale_soi_to_cbo(session) + + +if __name__ == "__main__": + main() diff --git a/policyengine_us_data/tests/test_scale_irs_soi_to_cbo.py b/policyengine_us_data/tests/test_scale_irs_soi_to_cbo.py new file mode 100644 index 000000000..b947723e1 --- /dev/null +++ b/policyengine_us_data/tests/test_scale_irs_soi_to_cbo.py @@ -0,0 +1,137 @@ +"""Tests for CBO-based scaling of IRS SOI database targets.""" + +import pytest +from sqlmodel import Session, select + +from policyengine_us_data.db.create_database_tables import ( + Stratum, + StratumConstraint, + Target, + create_database, +) +from policyengine_us_data.db.scale_irs_soi_to_cbo import ( + SOI_SOURCE_ID, + TARGET_YEAR, + _compute_state_aggregate, + _get_cbo_targets, + _scale_targets, + scale_soi_to_cbo, +) + + +@pytest.fixture +def engine(tmp_path): + db_uri = f"sqlite:///{tmp_path / 'test.db'}" + return create_database(db_uri) + + +def _make_stratum(session, ucgid): + """Helper: create a stratum with a ucgid_str constraint.""" + s = Stratum(stratum_group_id=0, notes=f"Geo: {ucgid}") + s.constraints_rel = [ + StratumConstraint( + constraint_variable="ucgid_str", + operation="in", + value=ucgid, + ) + ] + session.add(s) + session.flush() + return s + + +CBO_VARIABLES = [ + "income_tax", + "unemployment_compensation", + "eitc", + "adjusted_gross_income", + "taxable_social_security", + "taxable_pension_income", + "net_capital_gain", + "qualified_dividend_income", + "taxable_interest_income", + "tax_exempt_interest_income", + "tax_unit_partnership_s_corp_income", + "dividend_income", + "person_count", +] + + +def test_cbo_targets_are_positive(): + """All CBO targets return positive 2024 values.""" + targets = _get_cbo_targets(TARGET_YEAR) + for name in CBO_VARIABLES: + assert name in targets, f"'{name}' missing" + assert targets[name] > 0, f"{name} = {targets[name]}" + + +def test_only_soi_targets_are_scaled(engine): + """Scaling only affects source_id=5, not other sources.""" + with Session(engine) as session: + st = _make_stratum(session, "0400000US06") + st.targets_rel = [ + Target( + variable="income_tax", + period=2022, + value=300e9, + source_id=SOI_SOURCE_ID, + active=True, + ), + ] + st2 = _make_stratum(session, "0400000US48") + st2.targets_rel = [ + Target( + variable="person_count", + period=2023, + value=30_000_000, + source_id=1, # Census age, not SOI + active=True, + ), + ] + session.commit() + + _scale_targets(session, "income_tax", 1.5, 2024) + session.commit() + + soi_t = session.exec( + select(Target) + .where(Target.variable == "income_tax") + .where(Target.source_id == SOI_SOURCE_ID) + ).one() + assert soi_t.value == pytest.approx(450e9) + assert soi_t.period == 2024 + + census_t = session.exec( + select(Target) + .where(Target.variable == "person_count") + .where(Target.source_id == 1) + ).one() + assert census_t.value == pytest.approx(30_000_000) + assert census_t.period == 2023 + + +def test_end_to_end_scaling(engine): + """After scaling, state aggregate matches CBO target.""" + cbo = _get_cbo_targets(TARGET_YEAR) + cbo_income_tax = cbo["income_tax"] + + with Session(engine) as session: + for fips, share in [("01", 0.02), ("06", 0.15), ("48", 0.10)]: + val = cbo_income_tax * share * 0.5 # intentionally stale + st = _make_stratum(session, f"0400000US{fips}") + st.targets_rel = [ + Target( + variable="income_tax", + period=2022, + value=val, + source_id=SOI_SOURCE_ID, + active=True, + ) + ] + session.commit() + + factors = scale_soi_to_cbo(session, TARGET_YEAR) + + assert "income_tax" in factors + new_total, _ = _compute_state_aggregate(session, "income_tax") + assert new_total == pytest.approx(cbo_income_tax, rel=1e-6)