diff --git a/changelog.d/fix-agi-geography-targeting.fixed.md b/changelog.d/fix-agi-geography-targeting.fixed.md new file mode 100644 index 000000000..8c77f0fa6 --- /dev/null +++ b/changelog.d/fix-agi-geography-targeting.fixed.md @@ -0,0 +1 @@ +Fix district AGI geography assignment to match target shares and use the requested calibration database when loading district AGI targets. diff --git a/policyengine_us_data/calibration/clone_and_assign.py b/policyengine_us_data/calibration/clone_and_assign.py index 52a53c20e..30661fd10 100644 --- a/policyengine_us_data/calibration/clone_and_assign.py +++ b/policyengine_us_data/calibration/clone_and_assign.py @@ -68,12 +68,28 @@ def load_global_block_distribution(): def _build_agi_block_probs(cds, pop_probs, cd_agi_targets): - """Multiply population block probs by CD AGI target weights.""" + """Reweight block probabilities to match district AGI target shares. + + District totals should be proportional to ``cd_agi_targets``, while + block shares within each district should preserve the original + population-weighted distribution. + """ agi_weights = np.array([cd_agi_targets.get(cd, 0.0) for cd in cds]) agi_weights = np.maximum(agi_weights, 0.0) if agi_weights.sum() == 0: return pop_probs - agi_probs = pop_probs * agi_weights + + district_pop_mass = ( + pd.Series(pop_probs, copy=False).groupby(cds).transform("sum").to_numpy() + ) + agi_probs = np.divide( + pop_probs * agi_weights, + district_pop_mass, + out=np.zeros_like(pop_probs, dtype=np.float64), + where=district_pop_mass > 0, + ) + if agi_probs.sum() == 0: + return pop_probs return agi_probs / agi_probs.sum() diff --git a/policyengine_us_data/calibration/unified_calibration.py b/policyengine_us_data/calibration/unified_calibration.py index e135b8493..1bb09ce0f 100644 --- a/policyengine_us_data/calibration/unified_calibration.py +++ b/policyengine_us_data/calibration/unified_calibration.py @@ -931,28 +931,19 @@ def run_calibration( time_period, ) + db_uri = f"sqlite:///{db_path}" + builder = UnifiedMatrixBuilder( + db_uri=db_uri, + time_period=time_period, + ) + # Compute base household AGI for conditional geographic assignment base_agi = sim.calculate("adjusted_gross_income", map_to="household").values.astype( np.float64 ) # Load CD-level AGI targets from database - import sqlite3 - - from policyengine_us_data.storage import STORAGE_FOLDER - - db_path = str(STORAGE_FOLDER / "calibration" / "policy_data.db") - conn = sqlite3.connect(db_path) - rows = conn.execute( - "SELECT sc.value, t.value " - "FROM targets t " - "JOIN stratum_constraints sc ON t.stratum_id = sc.stratum_id " - "WHERE t.variable = 'adjusted_gross_income' " - "AND sc.constraint_variable = 'congressional_district_geoid' " - "AND t.active = 1" - ).fetchall() - conn.close() - cd_agi_targets = {str(row[0]): float(row[1]) for row in rows} + cd_agi_targets = builder.get_district_agi_targets() logger.info( "Loaded %d CD AGI targets for conditional assignment", len(cd_agi_targets), @@ -1033,12 +1024,7 @@ def run_calibration( # Step 6: Build sparse calibration matrix do_rerandomize = not skip_takeup_rerandomize t_matrix = time.time() - db_uri = f"sqlite:///{db_path}" - builder = UnifiedMatrixBuilder( - db_uri=db_uri, - time_period=time_period, - dataset_path=dataset_for_matrix, - ) + builder.dataset_path = dataset_for_matrix targets_df, X_sparse, target_names = builder.build_matrix( geography=geography, sim=sim, diff --git a/policyengine_us_data/calibration/unified_matrix_builder.py b/policyengine_us_data/calibration/unified_matrix_builder.py index 1917e1a14..c07491a88 100644 --- a/policyengine_us_data/calibration/unified_matrix_builder.py +++ b/policyengine_us_data/calibration/unified_matrix_builder.py @@ -1621,6 +1621,19 @@ def _query_targets(self, target_filter: dict) -> pd.DataFrame: params={"time_period": self.time_period}, ) + def get_district_agi_targets(self) -> Dict[str, float]: + """Return current-law district AGI targets for geography assignment.""" + targets_df = self._query_targets({"variables": ["adjusted_gross_income"]}) + district_rows = targets_df[ + (targets_df["geo_level"] == "district") + & (targets_df["reform_id"] == 0) + & (targets_df["domain_variable"].fillna("") == "") + ] + return { + str(row["geographic_id"]): float(row["value"]) + for _, row in district_rows.iterrows() + } + # --------------------------------------------------------------- # Uprating # --------------------------------------------------------------- diff --git a/tests/unit/calibration/test_clone_and_assign.py b/tests/unit/calibration/test_clone_and_assign.py index 9eb1b6f5d..ab297cd89 100644 --- a/tests/unit/calibration/test_clone_and_assign.py +++ b/tests/unit/calibration/test_clone_and_assign.py @@ -11,6 +11,7 @@ from policyengine_us_data.calibration.clone_and_assign import ( GeographyAssignment, + _build_agi_block_probs, load_global_block_distribution, assign_random_geography, double_geography_for_puf, @@ -87,6 +88,25 @@ def test_state_fips_extracted(self, tmp_path): class TestAssignRandomGeography: + def test_build_agi_block_probs_matches_district_target_shares(self): + cds = np.array(["101", "101", "102", "102"]) + pop_probs = np.array([0.45, 0.45, 0.05, 0.05], dtype=np.float64) + agi_targets = {"101": 1.0, "102": 3.0} + + agi_probs = _build_agi_block_probs(cds, pop_probs, agi_targets) + + by_cd = {cd: agi_probs[cds == cd].sum() for cd in np.unique(cds)} + np.testing.assert_allclose(by_cd["101"], 0.25) + np.testing.assert_allclose(by_cd["102"], 0.75) + np.testing.assert_allclose( + agi_probs[cds == "101"] / agi_probs[cds == "101"].sum(), + pop_probs[cds == "101"] / pop_probs[cds == "101"].sum(), + ) + np.testing.assert_allclose( + agi_probs[cds == "102"] / agi_probs[cds == "102"].sum(), + pop_probs[cds == "102"] / pop_probs[cds == "102"].sum(), + ) + @patch( "policyengine_us_data.calibration.clone_and_assign" ".load_global_block_distribution" diff --git a/tests/unit/calibration/test_unified_calibration.py b/tests/unit/calibration/test_unified_calibration.py index abb8f0a40..3617c9081 100644 --- a/tests/unit/calibration/test_unified_calibration.py +++ b/tests/unit/calibration/test_unified_calibration.py @@ -6,6 +6,9 @@ """ import numpy as np +import pytest +from types import SimpleNamespace +from unittest.mock import patch from policyengine_us_data.utils.randomness import seeded_rng from policyengine_us_data.utils.takeup import ( @@ -352,6 +355,68 @@ def test_county_fips_length(self): assert all(len(c) == 5 for c in ga.county_fips) +class TestRunCalibrationAgiTargets: + def test_uses_requested_db_for_district_agi_targets(self): + from policyengine_us_data.calibration.unified_calibration import ( + run_calibration, + ) + + captured = {} + + class StopAfterAssignment(RuntimeError): + pass + + class FakeMicrosimulation: + def __init__(self, dataset, reform=None): + self.dataset = SimpleNamespace( + load_dataset=lambda: {"household_id": {2024: np.array([1, 2])}} + ) + + def calculate(self, variable, *args, **kwargs): + if variable == "household_id": + return SimpleNamespace(values=np.array([1, 2], dtype=np.int64)) + if variable == "adjusted_gross_income": + return SimpleNamespace( + values=np.array([100.0, 200.0], dtype=np.float64) + ) + raise AssertionError(f"Unexpected calculate({variable!r})") + + class FakeBuilder: + def __init__(self, db_uri, time_period, dataset_path=None): + captured["db_uri"] = db_uri + captured["time_period"] = time_period + captured["dataset_path_at_init"] = dataset_path + + def get_district_agi_targets(self): + return {"601": 123.0} + + def fake_assign_random_geography(**kwargs): + captured["assign_kwargs"] = kwargs + raise StopAfterAssignment + + with ( + patch("policyengine_us.Microsimulation", FakeMicrosimulation), + patch( + "policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder", + FakeBuilder, + ), + patch( + "policyengine_us_data.calibration.clone_and_assign.assign_random_geography", + fake_assign_random_geography, + ), + ): + with pytest.raises(StopAfterAssignment): + run_calibration( + dataset_path="input.h5", + db_path="/tmp/custom-policy-data.db", + n_clones=2, + ) + + assert captured["db_uri"] == "sqlite:////tmp/custom-policy-data.db" + assert captured["time_period"] == 2024 + assert captured["assign_kwargs"]["cd_agi_targets"] == {"601": 123.0} + + class TestBlockTakeupSeeding: """Verify compute_block_takeup_for_entities is reproducible and clone-dependent.""" diff --git a/tests/unit/calibration/test_unified_matrix_builder.py b/tests/unit/calibration/test_unified_matrix_builder.py index 536160c08..ee25bf4da 100644 --- a/tests/unit/calibration/test_unified_matrix_builder.py +++ b/tests/unit/calibration/test_unified_matrix_builder.py @@ -116,7 +116,7 @@ def _create_legacy_target_overview(engine): def _insert_aca_ptc_data(engine): with engine.connect() as conn: - strata = [1, 2, 3, 4, 5, 6, 7, 8, 9] + strata = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] for sid in strata: conn.execute( text( @@ -147,6 +147,8 @@ def _insert_aca_ptc_data(engine): (14, 8, "aca_ptc", ">", "0"), (15, 8, "congressional_district_geoid", "=", "3702"), (16, 9, "aca_ptc", ">", "0"), + (17, 10, "congressional_district_geoid", "=", "601"), + (18, 11, "congressional_district_geoid", "=", "602"), ] for cid, sid, var, op, val in constraints: conn.execute( @@ -183,6 +185,9 @@ def _insert_aca_ptc_data(engine): (17, 9, "person_count", 0, 19743689.0, 2024, 1), (18, 1, "aca_ptc", 1, 999.0, 2022, 1), (19, 1, "aca_ptc", 0, 12345.0, 2024, 0), + (20, 10, "adjusted_gross_income", 0, 1000.0, 2021, 1), + (21, 10, "adjusted_gross_income", 0, 1500.0, 2022, 1), + (22, 11, "adjusted_gross_income", 0, 800.0, 2022, 1), ] for tid, sid, var, reform_id, val, period, active in targets: conn.execute( @@ -297,6 +302,13 @@ def test_target_name_adds_expenditure_suffix_for_reforms(self): ) self.assertEqual(name, "national/salt_deduction_expenditure") + def test_get_district_agi_targets_uses_requested_db_periods(self): + b = self._make_builder(time_period=2024) + self.assertEqual( + b.get_district_agi_targets(), + {"601": 1500.0, "602": 800.0}, + ) + class TestHierarchicalUprating(unittest.TestCase): @classmethod