Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/fix-agi-geography-targeting.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix district AGI geography assignment to match target shares and use the requested calibration database when loading district AGI targets.
20 changes: 18 additions & 2 deletions policyengine_us_data/calibration/clone_and_assign.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,28 @@ def load_global_block_distribution():


def _build_agi_block_probs(cds, pop_probs, cd_agi_targets):
"""Multiply population block probs by CD AGI target weights."""
"""Reweight block probabilities to match district AGI target shares.

District totals should be proportional to ``cd_agi_targets``, while
block shares within each district should preserve the original
population-weighted distribution.
"""
agi_weights = np.array([cd_agi_targets.get(cd, 0.0) for cd in cds])
agi_weights = np.maximum(agi_weights, 0.0)
if agi_weights.sum() == 0:
return pop_probs
agi_probs = pop_probs * agi_weights

district_pop_mass = (
pd.Series(pop_probs, copy=False).groupby(cds).transform("sum").to_numpy()
)
agi_probs = np.divide(
pop_probs * agi_weights,
district_pop_mass,
out=np.zeros_like(pop_probs, dtype=np.float64),
where=district_pop_mass > 0,
)
if agi_probs.sum() == 0:
return pop_probs
return agi_probs / agi_probs.sum()


Expand Down
30 changes: 8 additions & 22 deletions policyengine_us_data/calibration/unified_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,28 +931,19 @@ def run_calibration(
time_period,
)

db_uri = f"sqlite:///{db_path}"
builder = UnifiedMatrixBuilder(
db_uri=db_uri,
time_period=time_period,
)

# Compute base household AGI for conditional geographic assignment
base_agi = sim.calculate("adjusted_gross_income", map_to="household").values.astype(
np.float64
)

# Load CD-level AGI targets from database
import sqlite3

from policyengine_us_data.storage import STORAGE_FOLDER

db_path = str(STORAGE_FOLDER / "calibration" / "policy_data.db")
conn = sqlite3.connect(db_path)
rows = conn.execute(
"SELECT sc.value, t.value "
"FROM targets t "
"JOIN stratum_constraints sc ON t.stratum_id = sc.stratum_id "
"WHERE t.variable = 'adjusted_gross_income' "
"AND sc.constraint_variable = 'congressional_district_geoid' "
"AND t.active = 1"
).fetchall()
conn.close()
cd_agi_targets = {str(row[0]): float(row[1]) for row in rows}
cd_agi_targets = builder.get_district_agi_targets()
logger.info(
"Loaded %d CD AGI targets for conditional assignment",
len(cd_agi_targets),
Expand Down Expand Up @@ -1033,12 +1024,7 @@ def run_calibration(
# Step 6: Build sparse calibration matrix
do_rerandomize = not skip_takeup_rerandomize
t_matrix = time.time()
db_uri = f"sqlite:///{db_path}"
builder = UnifiedMatrixBuilder(
db_uri=db_uri,
time_period=time_period,
dataset_path=dataset_for_matrix,
)
builder.dataset_path = dataset_for_matrix
targets_df, X_sparse, target_names = builder.build_matrix(
geography=geography,
sim=sim,
Expand Down
13 changes: 13 additions & 0 deletions policyengine_us_data/calibration/unified_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,19 @@ def _query_targets(self, target_filter: dict) -> pd.DataFrame:
params={"time_period": self.time_period},
)

def get_district_agi_targets(self) -> Dict[str, float]:
"""Return current-law district AGI targets for geography assignment."""
targets_df = self._query_targets({"variables": ["adjusted_gross_income"]})
district_rows = targets_df[
(targets_df["geo_level"] == "district")
& (targets_df["reform_id"] == 0)
& (targets_df["domain_variable"].fillna("") == "")
]
return {
str(row["geographic_id"]): float(row["value"])
for _, row in district_rows.iterrows()
}

# ---------------------------------------------------------------
# Uprating
# ---------------------------------------------------------------
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/calibration/test_clone_and_assign.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from policyengine_us_data.calibration.clone_and_assign import (
GeographyAssignment,
_build_agi_block_probs,
load_global_block_distribution,
assign_random_geography,
double_geography_for_puf,
Expand Down Expand Up @@ -87,6 +88,25 @@ def test_state_fips_extracted(self, tmp_path):


class TestAssignRandomGeography:
def test_build_agi_block_probs_matches_district_target_shares(self):
cds = np.array(["101", "101", "102", "102"])
pop_probs = np.array([0.45, 0.45, 0.05, 0.05], dtype=np.float64)
agi_targets = {"101": 1.0, "102": 3.0}

agi_probs = _build_agi_block_probs(cds, pop_probs, agi_targets)

by_cd = {cd: agi_probs[cds == cd].sum() for cd in np.unique(cds)}
np.testing.assert_allclose(by_cd["101"], 0.25)
np.testing.assert_allclose(by_cd["102"], 0.75)
np.testing.assert_allclose(
agi_probs[cds == "101"] / agi_probs[cds == "101"].sum(),
pop_probs[cds == "101"] / pop_probs[cds == "101"].sum(),
)
np.testing.assert_allclose(
agi_probs[cds == "102"] / agi_probs[cds == "102"].sum(),
pop_probs[cds == "102"] / pop_probs[cds == "102"].sum(),
)

@patch(
"policyengine_us_data.calibration.clone_and_assign"
".load_global_block_distribution"
Expand Down
65 changes: 65 additions & 0 deletions tests/unit/calibration/test_unified_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
"""

import numpy as np
import pytest
from types import SimpleNamespace
from unittest.mock import patch

from policyengine_us_data.utils.randomness import seeded_rng
from policyengine_us_data.utils.takeup import (
Expand Down Expand Up @@ -352,6 +355,68 @@ def test_county_fips_length(self):
assert all(len(c) == 5 for c in ga.county_fips)


class TestRunCalibrationAgiTargets:
def test_uses_requested_db_for_district_agi_targets(self):
from policyengine_us_data.calibration.unified_calibration import (
run_calibration,
)

captured = {}

class StopAfterAssignment(RuntimeError):
pass

class FakeMicrosimulation:
def __init__(self, dataset, reform=None):
self.dataset = SimpleNamespace(
load_dataset=lambda: {"household_id": {2024: np.array([1, 2])}}
)

def calculate(self, variable, *args, **kwargs):
if variable == "household_id":
return SimpleNamespace(values=np.array([1, 2], dtype=np.int64))
if variable == "adjusted_gross_income":
return SimpleNamespace(
values=np.array([100.0, 200.0], dtype=np.float64)
)
raise AssertionError(f"Unexpected calculate({variable!r})")

class FakeBuilder:
def __init__(self, db_uri, time_period, dataset_path=None):
captured["db_uri"] = db_uri
captured["time_period"] = time_period
captured["dataset_path_at_init"] = dataset_path

def get_district_agi_targets(self):
return {"601": 123.0}

def fake_assign_random_geography(**kwargs):
captured["assign_kwargs"] = kwargs
raise StopAfterAssignment

with (
patch("policyengine_us.Microsimulation", FakeMicrosimulation),
patch(
"policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder",
FakeBuilder,
),
patch(
"policyengine_us_data.calibration.clone_and_assign.assign_random_geography",
fake_assign_random_geography,
),
):
with pytest.raises(StopAfterAssignment):
run_calibration(
dataset_path="input.h5",
db_path="/tmp/custom-policy-data.db",
n_clones=2,
)

assert captured["db_uri"] == "sqlite:////tmp/custom-policy-data.db"
assert captured["time_period"] == 2024
assert captured["assign_kwargs"]["cd_agi_targets"] == {"601": 123.0}


class TestBlockTakeupSeeding:
"""Verify compute_block_takeup_for_entities is
reproducible and clone-dependent."""
Expand Down
14 changes: 13 additions & 1 deletion tests/unit/calibration/test_unified_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _create_legacy_target_overview(engine):

def _insert_aca_ptc_data(engine):
with engine.connect() as conn:
strata = [1, 2, 3, 4, 5, 6, 7, 8, 9]
strata = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
for sid in strata:
conn.execute(
text(
Expand Down Expand Up @@ -147,6 +147,8 @@ def _insert_aca_ptc_data(engine):
(14, 8, "aca_ptc", ">", "0"),
(15, 8, "congressional_district_geoid", "=", "3702"),
(16, 9, "aca_ptc", ">", "0"),
(17, 10, "congressional_district_geoid", "=", "601"),
(18, 11, "congressional_district_geoid", "=", "602"),
]
for cid, sid, var, op, val in constraints:
conn.execute(
Expand Down Expand Up @@ -183,6 +185,9 @@ def _insert_aca_ptc_data(engine):
(17, 9, "person_count", 0, 19743689.0, 2024, 1),
(18, 1, "aca_ptc", 1, 999.0, 2022, 1),
(19, 1, "aca_ptc", 0, 12345.0, 2024, 0),
(20, 10, "adjusted_gross_income", 0, 1000.0, 2021, 1),
(21, 10, "adjusted_gross_income", 0, 1500.0, 2022, 1),
(22, 11, "adjusted_gross_income", 0, 800.0, 2022, 1),
]
for tid, sid, var, reform_id, val, period, active in targets:
conn.execute(
Expand Down Expand Up @@ -297,6 +302,13 @@ def test_target_name_adds_expenditure_suffix_for_reforms(self):
)
self.assertEqual(name, "national/salt_deduction_expenditure")

def test_get_district_agi_targets_uses_requested_db_periods(self):
b = self._make_builder(time_period=2024)
self.assertEqual(
b.get_district_agi_targets(),
{"601": 1500.0, "602": 800.0},
)


class TestHierarchicalUprating(unittest.TestCase):
@classmethod
Expand Down
Loading