From 547d5189f94cd58d2dd16efd4c999ede052cb354 Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 11:41:09 -0400 Subject: [PATCH 01/10] Add research-grade DGP parameters to generate_survey_did_data() Add 6 new optional parameters (icc, weight_cv, informative_sampling, heterogeneous_te_by_strata, strata_sizes, return_true_population_att) that enable coverage studies and tutorials demonstrating when survey- design methods outperform naive estimators. All default to current behavior for backward compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 229 +++++++++++++++++++++++++++++++++++++++--- tests/test_prep.py | 174 ++++++++++++++++++++++++++++++++ 2 files changed, 389 insertions(+), 14 deletions(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index 914a68d9..48f43974 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1129,6 +1129,36 @@ def generate_staggered_ddd_data( return pd.DataFrame(records) +def _rank_pair_weights( + unit_weight: np.ndarray, + unit_stratum: np.ndarray, + y0: np.ndarray, + n_strata: int, +) -> None: + """Rank-pair weights with Y(0) within each stratum (in-place). + + High-outcome units receive lower weights, modeling informative sampling + where hard-to-reach (high-outcome) subpopulations are under-covered. + """ + for s in range(n_strata): + mask = unit_stratum == s + n_s = mask.sum() + if n_s <= 1: + continue + idx_s = np.where(mask)[0] + w_vals = unit_weight[idx_s].copy() + if w_vals.std() < 1e-10: + # No weight variation: create inverse-rank weights, mean=1 + ranks = np.argsort(np.argsort(y0[idx_s])) + inv_rank = (n_s - ranks).astype(float) + unit_weight[idx_s] = inv_rank / inv_rank.mean() + else: + # Rank-pair: highest Y(0) gets lightest weight + y0_order = np.argsort(-y0[idx_s]) + w_sorted = np.sort(w_vals) + unit_weight[idx_s[y0_order]] = w_sorted + + def generate_survey_did_data( n_units: int = 200, n_periods: int = 8, @@ -1149,6 +1179,13 @@ def generate_survey_did_data( add_covariates: bool = False, panel: bool = True, seed: Optional[int] = None, + # --- Research-grade DGP parameters --- + icc: Optional[float] = None, + weight_cv: Optional[float] = None, + informative_sampling: bool = False, + heterogeneous_te_by_strata: bool = False, + strata_sizes: Optional[List[int]] = None, + return_true_population_att: bool = False, ) -> pd.DataFrame: """ Generate synthetic staggered DiD data with survey structure. @@ -1215,6 +1252,37 @@ def generate_survey_did_data( CallawaySantAnna(panel=False)). seed : int, optional Random seed for reproducibility. + icc : float, optional + Target intra-class correlation coefficient (0 < icc < 1). Overrides + ``psu_re_sd`` via the variance decomposition: + ``psu_re_sd = sqrt(icc * (unit_fe_sd^2 + noise_sd^2) / + ((1 - icc) * (1 + psu_period_factor^2)))``. + Cannot be combined with a non-default ``psu_re_sd``. + weight_cv : float, optional + Target coefficient of variation for sampling weights. Generates + LogNormal weights normalized to mean 1, bypassing ``weight_variation``. + Cannot be combined with a non-default ``weight_variation``. + informative_sampling : bool, default=False + If True, sampling weights correlate with Y(0) — high-outcome units + receive lower weights (under-coverage). Uses rank-pairing within + each stratum. For panel data, ranking is done once from period-1 + outcomes. For repeated cross-sections, ranking is refreshed each + period. When ``weight_variation="none"`` and no ``weight_cv``, + creates inverse-rank weights normalized to mean 1. + heterogeneous_te_by_strata : bool, default=False + If True, treatment effect varies by stratum: + ``TE_h = TE * (1 + 0.5 * (h - mean) / std)``. Creates a gap + between unweighted and population ATT. With ``n_strata=1``, + all units receive the base ``treatment_effect``. + strata_sizes : list of int, optional + Custom per-stratum unit counts. Must have length ``n_strata`` and + sum to ``n_units``. Replaces equal allocation across strata. + return_true_population_att : bool, default=False + If True, attaches a diagnostic dict to ``df.attrs["dgp_truth"]`` + with keys: ``population_att`` (weight-weighted average of treated + true effects), ``deff_kish`` (1 + CV(w)^2), ``stratum_effects`` + (dict mapping stratum index to TE), ``icc_realized`` (between/total + variance ratio from generated data). Returns ------- @@ -1222,6 +1290,8 @@ def generate_survey_did_data( Columns: unit, period, outcome, first_treat, treated, true_effect, stratum, psu, fpc, weight. Also rep_0..rep_K if include_replicate_weights=True, and x1, x2 if add_covariates=True. + If ``return_true_population_att=True``, ``df.attrs["dgp_truth"]`` + contains DGP diagnostics. """ rng = np.random.default_rng(seed) @@ -1284,30 +1354,82 @@ def generate_survey_did_data( f"weight_variation must be one of {valid_wv}, got {weight_variation!r}" ) + # --- Validate research-grade DGP parameters --- + if icc is not None: + if not (0 < icc < 1): + raise ValueError(f"icc must be between 0 and 1 (exclusive), got {icc}") + if psu_re_sd != 2.0: + raise ValueError( + "Cannot specify both icc and a non-default psu_re_sd. " + "icc overrides psu_re_sd via the ICC formula." + ) + + if weight_cv is not None: + if weight_cv <= 0: + raise ValueError(f"weight_cv must be positive, got {weight_cv}") + if weight_variation != "moderate": + raise ValueError( + "Cannot specify both weight_cv and a non-default " + "weight_variation. weight_cv overrides weight_variation." + ) + + if strata_sizes is not None: + strata_sizes = list(strata_sizes) + if len(strata_sizes) != n_strata: + raise ValueError( + f"strata_sizes must have length n_strata={n_strata}, " + f"got {len(strata_sizes)}" + ) + if any(s < 1 for s in strata_sizes): + raise ValueError("All strata_sizes must be >= 1") + if sum(strata_sizes) != n_units: + raise ValueError( + f"strata_sizes must sum to n_units={n_units}, " + f"got {sum(strata_sizes)}" + ) + + # --- ICC -> psu_re_sd resolution --- + if icc is not None: + psu_re_sd = np.sqrt( + icc * (unit_fe_sd**2 + noise_sd**2) + / ((1 - icc) * (1 + psu_period_factor**2)) + ) + # --- Survey structure: assign units to strata and PSUs --- n_psu_total = n_strata * psu_per_stratum - units_per_stratum = n_units // n_strata - remainder = n_units % n_strata + + if strata_sizes is not None: + stratum_n = strata_sizes + else: + units_per_stratum = n_units // n_strata + remainder = n_units % n_strata + stratum_n = [ + units_per_stratum + (1 if s < remainder else 0) + for s in range(n_strata) + ] unit_stratum = np.empty(n_units, dtype=int) unit_psu = np.empty(n_units, dtype=int) idx = 0 for s in range(n_strata): - # Distribute remainder units across first strata - n_s = units_per_stratum + (1 if s < remainder else 0) + n_s = stratum_n[s] unit_stratum[idx : idx + n_s] = s - - # Assign PSUs within this stratum psu_start = s * psu_per_stratum for j in range(n_s): unit_psu[idx + j] = psu_start + (j % psu_per_stratum) idx += n_s - # Sampling weights: vary by stratum (inverse selection probability) - scale_map = {"none": 0.0, "moderate": 1.0, "high": 3.0} - scale = scale_map.get(weight_variation, 1.0) - denom = max(n_strata - 1, 1) - unit_weight = 1.0 + scale * (unit_stratum / denom) + # Sampling weights + if weight_cv is not None: + sigma_ln = np.sqrt(np.log(1 + weight_cv**2)) + raw_w = rng.lognormal(-sigma_ln**2 / 2, sigma_ln, size=n_units) + unit_weight = raw_w / raw_w.mean() + else: + # Stratum-based weights (inverse selection probability) + scale_map = {"none": 0.0, "moderate": 1.0, "high": 3.0} + scale = scale_map.get(weight_variation, 1.0) + denom = max(n_strata - 1, 1) + unit_weight = 1.0 + scale * (unit_stratum / denom) # --- Treatment assignment (cohort structure) --- n_never = int(n_units * never_treated_frac) @@ -1344,6 +1466,33 @@ def generate_survey_did_data( 0, psu_re_sd * psu_period_factor, size=(n_psu_total, n_periods) ) + # --- Informative sampling (panel path): pre-draw FEs, rank-pair weights --- + if informative_sampling and panel: + _panel_unit_fe = rng.normal(0, unit_fe_sd, size=n_units) + y0_period1 = ( + _panel_unit_fe + + psu_re[unit_psu] + + psu_period_re[unit_psu, 0] + + 0.5 + ) + _rank_pair_weights(unit_weight, unit_stratum, y0_period1, n_strata) + + # Save base weights for cross-section informative sampling (reset each period) + if informative_sampling and not panel: + _base_weight = unit_weight.copy() + + # --- Heterogeneous treatment effects by stratum --- + if heterogeneous_te_by_strata: + if n_strata == 1: + te_by_stratum = np.array([treatment_effect]) + else: + strata_idx = np.arange(n_strata, dtype=float) + te_by_stratum = treatment_effect * ( + 1 + 0.5 * (strata_idx - strata_idx.mean()) / strata_idx.std() + ) + else: + te_by_stratum = None + # --- Generate panel or repeated cross-sections --- records = [] for t in range(1, n_periods + 1): @@ -1351,11 +1500,24 @@ def generate_survey_did_data( unit_fe = rng.normal(0, unit_fe_sd, size=n_units) if panel and t > 1: pass # reuse unit_fe from first period (set below) - if panel and t == 1: + if informative_sampling and panel: + unit_fe = _panel_unit_fe # use pre-drawn FEs + elif panel and t == 1: _panel_unit_fe = unit_fe # save for reuse - if panel and t > 1: + elif panel and t > 1: unit_fe = _panel_unit_fe # type: ignore[possibly-undefined] + # Cross-section informative sampling: re-rank weights each period + if informative_sampling and not panel: + unit_weight = _base_weight.copy() # type: ignore[possibly-undefined] + y0_t = ( + unit_fe + + psu_re[unit_psu] + + psu_period_re[unit_psu, t - 1] + + 0.5 * t + ) + _rank_pair_weights(unit_weight, unit_stratum, y0_t, n_strata) + x1 = rng.normal(0, 1, size=n_units) if add_covariates else None if panel and t > 1 and add_covariates: x1 = _panel_x1 # type: ignore[possibly-undefined] @@ -1379,7 +1541,10 @@ def generate_survey_did_data( treated = int(g_i > 0 and t >= g_i) true_eff = 0.0 if treated: - true_eff = treatment_effect + if te_by_stratum is not None: + true_eff = float(te_by_stratum[unit_stratum[i]]) + else: + true_eff = treatment_effect if dynamic_effects: true_eff *= 1 + effect_growth * (t - g_i) y += true_eff @@ -1426,4 +1591,40 @@ def generate_survey_did_data( w_r[w_r > 0] *= n_rep / (n_rep - 1) df[f"rep_{r}"] = w_r + # --- DGP truth diagnostics --- + if return_true_population_att: + treated_mask = df["treated"] == 1 + if treated_mask.any(): + w_treated = df.loc[treated_mask, "weight"].values + te_treated = df.loc[treated_mask, "true_effect"].values + population_att = float(np.average(te_treated, weights=w_treated)) + else: + population_att = 0.0 + + if te_by_stratum is not None: + stratum_effects = { + int(s): float(te_by_stratum[s]) for s in range(n_strata) + } + else: + stratum_effects = { + int(s): float(treatment_effect) for s in range(n_strata) + } + + # Kish DEFF from weight variation + w_all = df.groupby("unit")["weight"].first().values + cv_w = float(w_all.std() / w_all.mean()) if w_all.mean() > 0 else 0.0 + deff_kish = 1 + cv_w**2 + + # Realized ICC (between-PSU / total variance ratio) + psu_means = df.groupby("psu")["outcome"].mean() + total_var = df["outcome"].var() + icc_realized = float(psu_means.var() / total_var) if total_var > 0 else 0.0 + + df.attrs["dgp_truth"] = { + "population_att": population_att, + "deff_kish": float(deff_kish), + "stratum_effects": stratum_effects, + "icc_realized": icc_realized, + } + return df diff --git a/tests/test_prep.py b/tests/test_prep.py index 72491a25..7ae4ba46 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1504,3 +1504,177 @@ def test_psu_period_factor_validation(self): generate_survey_did_data(psu_period_factor=math.nan, seed=42) with pytest.raises(ValueError, match="psu_period_factor"): generate_survey_did_data(psu_period_factor=math.inf, seed=42) + + +class TestSurveyDGPResearchGrade: + """Tests for research-grade DGP parameters added to generate_survey_did_data.""" + + def test_icc_parameter(self): + """Realized ICC should be within 50% relative tolerance of target.""" + from diff_diff.prep_dgp import generate_survey_did_data + + target_icc = 0.3 + df = generate_survey_did_data( + n_units=1000, icc=target_icc, seed=42 + ) + # ANOVA-based ICC on period 1 (pre-treatment, no TE contamination) + p1 = df[df["period"] == 1] + groups = p1.groupby("psu")["outcome"] + grand_mean = p1["outcome"].mean() + n_total = len(p1) + n_groups = groups.ngroups + n_bar = n_total / n_groups + ssb = (groups.size() * (groups.mean() - grand_mean) ** 2).sum() + msb = ssb / (n_groups - 1) + ssw = groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() + msw = ssw / (n_total - n_groups) + realized_icc = (msb - msw) / (msb + (n_bar - 1) * msw) + assert abs(realized_icc - target_icc) / target_icc < 0.50 + + def test_icc_and_psu_re_sd_conflict(self): + """Cannot specify both icc and a non-default psu_re_sd.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="Cannot specify both icc"): + generate_survey_did_data(icc=0.3, psu_re_sd=3.0, seed=42) + + def test_icc_out_of_range(self): + """icc must be in (0, 1).""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="icc must be between"): + generate_survey_did_data(icc=0.0, seed=42) + with pytest.raises(ValueError, match="icc must be between"): + generate_survey_did_data(icc=1.0, seed=42) + + def test_weight_cv_parameter(self): + """Realized weight CV should be within 0.15 of target.""" + from diff_diff.prep_dgp import generate_survey_did_data + + target_cv = 0.5 + df = generate_survey_did_data( + n_units=1000, weight_cv=target_cv, seed=42 + ) + weights = df.groupby("unit")["weight"].first().values + realized_cv = weights.std() / weights.mean() + assert abs(realized_cv - target_cv) < 0.15 + + def test_weight_cv_and_weight_variation_conflict(self): + """Cannot specify both weight_cv and a non-default weight_variation.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="Cannot specify both weight_cv"): + generate_survey_did_data( + weight_cv=0.5, weight_variation="high", seed=42 + ) + + def test_informative_sampling_panel(self): + """Informative sampling should create weight-outcome correlation.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + weight_cv=0.5, + seed=42, + ) + # Period-1 outcomes: weighted mean should differ from unweighted + p1 = df[df["period"] == 1] + unwt_mean = p1["outcome"].mean() + wt_mean = np.average(p1["outcome"], weights=p1["weight"]) + assert abs(wt_mean - unwt_mean) > 0.1 + + def test_informative_sampling_cross_section(self): + """Cross-section informative sampling: per-period negative correlation.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + weight_cv=0.5, + panel=False, + seed=42, + ) + # Check correlation for period 1 + p1 = df[df["period"] == 1] + corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] + assert corr < -0.1 + + def test_heterogeneous_te_by_strata(self): + """Unweighted mean TE should differ from population ATT.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + heterogeneous_te_by_strata=True, + strata_sizes=[400, 200, 200, 100, 100], + return_true_population_att=True, + seed=42, + ) + treated = df[df["treated"] == 1] + unwt_mean_te = treated["true_effect"].mean() + pop_att = df.attrs["dgp_truth"]["population_att"] + # With unequal strata sizes + heterogeneous TE, these should differ + assert abs(unwt_mean_te - pop_att) > 0.01 + + def test_heterogeneous_te_single_stratum(self): + """n_strata=1 with heterogeneous TE should not crash.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=50, + n_strata=1, + psu_per_stratum=8, + fpc_per_stratum=200.0, + heterogeneous_te_by_strata=True, + seed=42, + ) + treated = df[df["treated"] == 1] + # All treated units should have the base treatment_effect + assert np.allclose(treated["true_effect"].unique(), [2.0], atol=0.01) + + def test_return_true_population_att(self): + """dgp_truth dict should have expected keys and reasonable values.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=200, + return_true_population_att=True, + seed=42, + ) + truth = df.attrs["dgp_truth"] + assert "population_att" in truth + assert "deff_kish" in truth + assert "stratum_effects" in truth + assert "icc_realized" in truth + assert truth["deff_kish"] >= 1.0 + assert truth["icc_realized"] >= 0.0 + + def test_strata_sizes(self): + """Custom strata_sizes should produce correct per-stratum counts.""" + from diff_diff.prep_dgp import generate_survey_did_data + + sizes = [60, 50, 40, 30, 20] + df = generate_survey_did_data( + n_units=200, strata_sizes=sizes, seed=42 + ) + for s, expected in enumerate(sizes): + actual = df[df["period"] == 1]["stratum"].value_counts().get(s, 0) + assert actual == expected + + def test_strata_sizes_sum_mismatch(self): + """strata_sizes must sum to n_units.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="strata_sizes must sum"): + generate_survey_did_data( + n_units=200, strata_sizes=[50, 50, 50, 50, 49], seed=42 + ) + + def test_backward_compatibility(self): + """Default params with same seed produce identical DataFrames.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df1 = generate_survey_did_data(seed=123) + df2 = generate_survey_did_data(seed=123) + pd.testing.assert_frame_equal(df1, df2) From 808d7aa2d7aa21bfae7972eab68fceef2bc84fc5 Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 14:58:27 -0400 Subject: [PATCH 02/10] Address AI review: fix pweight semantics, ICC diagnostic, weight_cv validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reverse _rank_pair_weights() so higher Y(0) → heavier weight (w=1/pi) - Replace icc_realized with ANOVA ICC on period-1 data (matches icc solver) - Add np.isfinite() guard on weight_cv to reject NaN/Inf - Fix test assertions: positive correlation, ICC regression check, NaN/Inf test Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 40 ++++++++++++++++++++++++++-------------- tests/test_prep.py | 22 +++++++++++++++++++--- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index 48f43974..1efb8163 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1137,8 +1137,9 @@ def _rank_pair_weights( ) -> None: """Rank-pair weights with Y(0) within each stratum (in-place). - High-outcome units receive lower weights, modeling informative sampling - where hard-to-reach (high-outcome) subpopulations are under-covered. + High-outcome units receive higher weights, modeling informative sampling + where hard-to-reach (high-outcome) subpopulations are under-covered + and therefore carry larger inverse-selection-probability weights. """ for s in range(n_strata): mask = unit_stratum == s @@ -1148,14 +1149,14 @@ def _rank_pair_weights( idx_s = np.where(mask)[0] w_vals = unit_weight[idx_s].copy() if w_vals.std() < 1e-10: - # No weight variation: create inverse-rank weights, mean=1 - ranks = np.argsort(np.argsort(y0[idx_s])) - inv_rank = (n_s - ranks).astype(float) - unit_weight[idx_s] = inv_rank / inv_rank.mean() + # No weight variation: create rank-based weights, mean=1 + # Higher Y(0) → higher rank → heavier weight + ranks = np.argsort(np.argsort(y0[idx_s])).astype(float) + 1.0 + unit_weight[idx_s] = ranks / ranks.mean() else: - # Rank-pair: highest Y(0) gets lightest weight + # Rank-pair: highest Y(0) gets heaviest weight y0_order = np.argsort(-y0[idx_s]) - w_sorted = np.sort(w_vals) + w_sorted = np.sort(w_vals)[::-1] # heaviest first unit_weight[idx_s[y0_order]] = w_sorted @@ -1365,8 +1366,10 @@ def generate_survey_did_data( ) if weight_cv is not None: - if weight_cv <= 0: - raise ValueError(f"weight_cv must be positive, got {weight_cv}") + if not np.isfinite(weight_cv) or weight_cv <= 0: + raise ValueError( + f"weight_cv must be finite and positive, got {weight_cv}" + ) if weight_variation != "moderate": raise ValueError( "Cannot specify both weight_cv and a non-default " @@ -1615,10 +1618,19 @@ def generate_survey_did_data( cv_w = float(w_all.std() / w_all.mean()) if w_all.mean() > 0 else 0.0 deff_kish = 1 + cv_w**2 - # Realized ICC (between-PSU / total variance ratio) - psu_means = df.groupby("psu")["outcome"].mean() - total_var = df["outcome"].var() - icc_realized = float(psu_means.var() / total_var) if total_var > 0 else 0.0 + # Realized ICC (ANOVA-based, period-1 only to avoid TE contamination) + _p1 = df[df["period"] == 1] + _groups = _p1.groupby("psu")["outcome"] + _n_total = len(_p1) + _n_groups = _groups.ngroups + _n_bar = _n_total / _n_groups + _grand_mean = _p1["outcome"].mean() + _ssb = (_groups.size() * (_groups.mean() - _grand_mean) ** 2).sum() + _msb = _ssb / max(_n_groups - 1, 1) + _ssw = _groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() + _msw = _ssw / max(_n_total - _n_groups, 1) + _denom = _msb + (_n_bar - 1) * _msw + icc_realized = float((_msb - _msw) / _denom) if _denom > 0 else 0.0 df.attrs["dgp_truth"] = { "population_att": population_att, diff --git a/tests/test_prep.py b/tests/test_prep.py index 7ae4ba46..5868df65 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1568,6 +1568,15 @@ def test_weight_cv_and_weight_variation_conflict(self): weight_cv=0.5, weight_variation="high", seed=42 ) + def test_weight_cv_nan_inf(self): + """weight_cv must reject NaN and Inf.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="weight_cv must be finite"): + generate_survey_did_data(weight_cv=np.nan, seed=42) + with pytest.raises(ValueError, match="weight_cv must be finite"): + generate_survey_did_data(weight_cv=np.inf, seed=42) + def test_informative_sampling_panel(self): """Informative sampling should create weight-outcome correlation.""" from diff_diff.prep_dgp import generate_survey_did_data @@ -1585,7 +1594,11 @@ def test_informative_sampling_panel(self): assert abs(wt_mean - unwt_mean) > 0.1 def test_informative_sampling_cross_section(self): - """Cross-section informative sampling: per-period negative correlation.""" + """Cross-section informative sampling: per-period positive correlation. + + Under w_i = 1/pi_i, under-covered (high-outcome) units get heavier + weights, so weight and outcome should be positively correlated. + """ from diff_diff.prep_dgp import generate_survey_did_data df = generate_survey_did_data( @@ -1598,7 +1611,7 @@ def test_informative_sampling_cross_section(self): # Check correlation for period 1 p1 = df[df["period"] == 1] corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] - assert corr < -0.1 + assert corr > 0.1 def test_heterogeneous_te_by_strata(self): """Unweighted mean TE should differ from population ATT.""" @@ -1638,7 +1651,8 @@ def test_return_true_population_att(self): from diff_diff.prep_dgp import generate_survey_did_data df = generate_survey_did_data( - n_units=200, + n_units=1000, + icc=0.3, return_true_population_att=True, seed=42, ) @@ -1649,6 +1663,8 @@ def test_return_true_population_att(self): assert "icc_realized" in truth assert truth["deff_kish"] >= 1.0 assert truth["icc_realized"] >= 0.0 + # icc_realized should track the target ICC (ANOVA-based, same formula) + assert abs(truth["icc_realized"] - 0.3) / 0.3 < 0.50 def test_strata_sizes(self): """Custom strata_sizes should produce correct per-stratum counts.""" From 5c3798a69d0e924a49f9bac0b701828cdc2ba3ce Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 15:12:55 -0400 Subject: [PATCH 03/10] Address AI review round 2: stratum weight preservation, docstring, validation - Scale rank weights by stratum mean to preserve weight_variation structure - Update informative_sampling and icc_realized docstrings to match code - Add integer type validation for strata_sizes - Add tests: default weight_variation path, positive correlation, float rejection Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 26 ++++++++++++++++---------- tests/test_prep.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index 1efb8163..f98ba0e7 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1149,10 +1149,10 @@ def _rank_pair_weights( idx_s = np.where(mask)[0] w_vals = unit_weight[idx_s].copy() if w_vals.std() < 1e-10: - # No weight variation: create rank-based weights, mean=1 - # Higher Y(0) → higher rank → heavier weight + # No within-stratum variation: create rank-based weights + # scaled to preserve stratum baseline weight level ranks = np.argsort(np.argsort(y0[idx_s])).astype(float) + 1.0 - unit_weight[idx_s] = ranks / ranks.mean() + unit_weight[idx_s] = ranks / ranks.mean() * w_vals.mean() else: # Rank-pair: highest Y(0) gets heaviest weight y0_order = np.argsort(-y0[idx_s]) @@ -1265,11 +1265,12 @@ def generate_survey_did_data( Cannot be combined with a non-default ``weight_variation``. informative_sampling : bool, default=False If True, sampling weights correlate with Y(0) — high-outcome units - receive lower weights (under-coverage). Uses rank-pairing within - each stratum. For panel data, ranking is done once from period-1 - outcomes. For repeated cross-sections, ranking is refreshed each - period. When ``weight_variation="none"`` and no ``weight_cv``, - creates inverse-rank weights normalized to mean 1. + receive higher weights (under-coverage → larger inverse-selection- + probability weights). Uses rank-pairing within each stratum. For + panel data, ranking is done once from period-1 outcomes. For + repeated cross-sections, ranking is refreshed each period. Within + each stratum, rank-based weights are scaled to preserve the + stratum's baseline weight level from ``weight_variation``. heterogeneous_te_by_strata : bool, default=False If True, treatment effect varies by stratum: ``TE_h = TE * (1 + 0.5 * (h - mean) / std)``. Creates a gap @@ -1282,8 +1283,8 @@ def generate_survey_did_data( If True, attaches a diagnostic dict to ``df.attrs["dgp_truth"]`` with keys: ``population_att`` (weight-weighted average of treated true effects), ``deff_kish`` (1 + CV(w)^2), ``stratum_effects`` - (dict mapping stratum index to TE), ``icc_realized`` (between/total - variance ratio from generated data). + (dict mapping stratum index to TE), ``icc_realized`` (ANOVA-based + ICC computed on period-1 data). Returns ------- @@ -1378,6 +1379,11 @@ def generate_survey_did_data( if strata_sizes is not None: strata_sizes = list(strata_sizes) + for ss in strata_sizes: + if isinstance(ss, bool) or not isinstance(ss, (int, np.integer)): + raise ValueError( + f"strata_sizes must contain integers, got {ss!r}" + ) if len(strata_sizes) != n_strata: raise ValueError( f"strata_sizes must have length n_strata={n_strata}, " diff --git a/tests/test_prep.py b/tests/test_prep.py index 5868df65..47ba5a9f 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1592,6 +1592,32 @@ def test_informative_sampling_panel(self): unwt_mean = p1["outcome"].mean() wt_mean = np.average(p1["outcome"], weights=p1["weight"]) assert abs(wt_mean - unwt_mean) > 0.1 + # Positive correlation: higher outcome → heavier weight + corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] + assert corr > 0.1 + + def test_informative_sampling_default_weights(self): + """Informative sampling preserves stratum-level weight structure.""" + from diff_diff.prep_dgp import generate_survey_did_data + + # Generate with informative_sampling but default weight_variation + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + seed=42, + ) + # Reference: expected stratum mean weights from weight_variation="moderate" + # Formula: 1.0 + 1.0 * (s / max(n_strata-1, 1)) for s=0..4 + p1 = df[df["period"] == 1] + for s in range(5): + expected_mean = 1.0 + 1.0 * (s / 4) + stratum_weights = p1.loc[p1["stratum"] == s, "weight"] + assert abs(stratum_weights.mean() - expected_mean) < 0.15, ( + f"Stratum {s}: expected mean ~{expected_mean}, " + f"got {stratum_weights.mean():.3f}" + ) + # Within-stratum variation should exist (informative sampling) + assert stratum_weights.std() > 0.01 def test_informative_sampling_cross_section(self): """Cross-section informative sampling: per-period positive correlation. @@ -1687,6 +1713,15 @@ def test_strata_sizes_sum_mismatch(self): n_units=200, strata_sizes=[50, 50, 50, 50, 49], seed=42 ) + def test_strata_sizes_float_rejected(self): + """strata_sizes must contain integers, not floats.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="strata_sizes must contain integers"): + generate_survey_did_data( + n_units=200, strata_sizes=[40.0, 40.0, 40.0, 40.0, 40.0], seed=42 + ) + def test_backward_compatibility(self): """Default params with same seed produce identical DataFrames.""" from diff_diff.prep_dgp import generate_survey_did_data From 7c6b16002f7fe34623548f2fca81f411384a161f Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 15:47:58 -0400 Subject: [PATCH 04/10] Address AI review round 3: ICC covariate variance, DGP REGISTRY notes - Include covariate variance (0.2725) in ICC calibration when add_covariates=True - Document informative_sampling structural Y(0) ranking in docstring and REGISTRY.md - Add cross-section default-weights test and ICC + covariates regression test Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 7 ++++++- docs/methodology/REGISTRY.md | 11 ++++++++++ tests/test_prep.py | 39 ++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index f98ba0e7..1b90baf8 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1271,6 +1271,8 @@ def generate_survey_did_data( repeated cross-sections, ranking is refreshed each period. Within each stratum, rank-based weights are scaled to preserve the stratum's baseline weight level from ``weight_variation``. + Ranking is based on structural Y(0) (unit FE + PSU effects), + excluding covariates from ``add_covariates``. heterogeneous_te_by_strata : bool, default=False If True, treatment effect varies by stratum: ``TE_h = TE * (1 + 0.5 * (h - mean) / std)``. Creates a gap @@ -1399,8 +1401,11 @@ def generate_survey_did_data( # --- ICC -> psu_re_sd resolution --- if icc is not None: + # Include covariate variance: Var(0.5*x1) + Var(0.3*x2) + # where x1 ~ N(0,1), x2 ~ Bernoulli(0.5) + cov_var = (0.25 + 0.09 * 0.25) if add_covariates else 0.0 psu_re_sd = np.sqrt( - icc * (unit_fe_sd**2 + noise_sd**2) + icc * (unit_fe_sd**2 + noise_sd**2 + cov_var) / ((1 - icc) * (1 + psu_period_factor**2)) ) diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 14ad53f1..75e249f9 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -2515,6 +2515,17 @@ The 8-step workflow in `docs/llms-practitioner.txt` is adapted from Baker et al. covariates). Paper's Step 8 is "Keep learning." The mandatory with/without covariate comparison is a diff-diff convention. +### Survey DGP (`generate_survey_did_data`) + +- **Note:** The `icc` parameter calibrates `psu_re_sd` using the variance + decomposition `Var(Y) = sigma²_psu * (1 + psu_period_factor²) + sigma²_unit + + sigma²_noise + sigma²_cov`. When `add_covariates=True`, the covariate variance + `Var(0.5*x1) + Var(0.3*x2) = 0.2725` is included in the calibration. +- **Note:** Defensive enhancement: `informative_sampling` ranks units on structural + Y(0) (unit FE + PSU effects + time trend), excluding covariate contributions from + `add_covariates`. This models selection on structural characteristics (geography, + demographics) rather than residual variation, matching real survey sampling frames. + --- # Version History diff --git a/tests/test_prep.py b/tests/test_prep.py index 47ba5a9f..3d50137d 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1639,6 +1639,45 @@ def test_informative_sampling_cross_section(self): corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] assert corr > 0.1 + def test_informative_sampling_cross_section_default_weights(self): + """Cross-section informative sampling with default weight_variation.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + panel=False, + seed=42, + ) + p1 = df[df["period"] == 1] + for s in range(5): + expected_mean = 1.0 + 1.0 * (s / 4) + stratum_weights = p1.loc[p1["stratum"] == s, "weight"] + assert abs(stratum_weights.mean() - expected_mean) < 0.15 + assert stratum_weights.std() > 0.01 + + def test_icc_with_covariates(self): + """ICC calibration should account for covariate variance.""" + from diff_diff.prep_dgp import generate_survey_did_data + + target_icc = 0.3 + df = generate_survey_did_data( + n_units=1000, icc=target_icc, add_covariates=True, seed=42 + ) + # ANOVA-based ICC on period 1 + p1 = df[df["period"] == 1] + groups = p1.groupby("psu")["outcome"] + grand_mean = p1["outcome"].mean() + n_total = len(p1) + n_groups = groups.ngroups + n_bar = n_total / n_groups + ssb = (groups.size() * (groups.mean() - grand_mean) ** 2).sum() + msb = ssb / (n_groups - 1) + ssw = groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() + msw = ssw / (n_total - n_groups) + realized_icc = (msb - msw) / (msb + (n_bar - 1) * msw) + assert abs(realized_icc - target_icc) / target_icc < 0.50 + def test_heterogeneous_te_by_strata(self): """Unweighted mean TE should differ from population ATT.""" from diff_diff.prep_dgp import generate_survey_did_data From 11d85015e332e2905241285bd987a578c51de56b Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 16:11:47 -0400 Subject: [PATCH 05/10] Include covariates in informative sampling Y(0) ranking - Pre-draw covariates before ranking step for both panel and cross-section paths when add_covariates=True, include in Y(0) for rank-pairing - Update docstrings: icc formula includes covariate variance, informative sampling includes covariates in ranking - Update REGISTRY.md: remove structural-only deviation, document full covariate inclusion - Add panel + cross-section tests for informative_sampling + add_covariates Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 42 +++++++++++++++++++++++++----------- docs/methodology/REGISTRY.md | 10 ++++----- tests/test_prep.py | 34 +++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index 1b90baf8..9eebf465 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1256,8 +1256,9 @@ def generate_survey_did_data( icc : float, optional Target intra-class correlation coefficient (0 < icc < 1). Overrides ``psu_re_sd`` via the variance decomposition: - ``psu_re_sd = sqrt(icc * (unit_fe_sd^2 + noise_sd^2) / - ((1 - icc) * (1 + psu_period_factor^2)))``. + ``psu_re_sd = sqrt(icc * (sigma2_unit + sigma2_noise + sigma2_cov) / + ((1 - icc) * (1 + psu_period_factor^2)))`` where ``sigma2_cov`` + includes covariate variance when ``add_covariates=True``. Cannot be combined with a non-default ``psu_re_sd``. weight_cv : float, optional Target coefficient of variation for sampling weights. Generates @@ -1271,8 +1272,8 @@ def generate_survey_did_data( repeated cross-sections, ranking is refreshed each period. Within each stratum, rank-based weights are scaled to preserve the stratum's baseline weight level from ``weight_variation``. - Ranking is based on structural Y(0) (unit FE + PSU effects), - excluding covariates from ``add_covariates``. + When ``add_covariates=True``, covariate contributions are + included in the Y(0) ranking. heterogeneous_te_by_strata : bool, default=False If True, treatment effect varies by stratum: ``TE_h = TE * (1 + 0.5 * (h - mean) / std)``. Creates a gap @@ -1489,6 +1490,10 @@ def generate_survey_did_data( + psu_period_re[unit_psu, 0] + 0.5 ) + if add_covariates: + _panel_x1 = rng.normal(0, 1, size=n_units) + _panel_x2 = rng.choice([0, 1], size=n_units) + y0_period1 = y0_period1 + 0.5 * _panel_x1 + 0.3 * _panel_x2 _rank_pair_weights(unit_weight, unit_stratum, y0_period1, n_strata) # Save base weights for cross-section informative sampling (reset each period) @@ -1523,6 +1528,10 @@ def generate_survey_did_data( # Cross-section informative sampling: re-rank weights each period if informative_sampling and not panel: + # Draw covariates early so they can be included in Y(0) ranking + if add_covariates: + x1 = rng.normal(0, 1, size=n_units) + x2 = rng.choice([0, 1], size=n_units) unit_weight = _base_weight.copy() # type: ignore[possibly-undefined] y0_t = ( unit_fe @@ -1530,18 +1539,27 @@ def generate_survey_did_data( + psu_period_re[unit_psu, t - 1] + 0.5 * t ) + if add_covariates: + y0_t = y0_t + 0.5 * x1 + 0.3 * x2 _rank_pair_weights(unit_weight, unit_stratum, y0_t, n_strata) - x1 = rng.normal(0, 1, size=n_units) if add_covariates else None - if panel and t > 1 and add_covariates: + # Covariates — may already be drawn by informative sampling above + if informative_sampling and panel and add_covariates: + x1 = _panel_x1 # pre-drawn before loop for ranking + x2 = _panel_x2 + elif informative_sampling and not panel and add_covariates: + pass # x1, x2 already drawn in cross-section ranking block + elif add_covariates: + x1 = rng.normal(0, 1, size=n_units) + x2 = rng.choice([0, 1], size=n_units) + else: + x1 = None + x2 = None + if not informative_sampling and panel and t > 1 and add_covariates: x1 = _panel_x1 # type: ignore[possibly-undefined] - elif panel and t == 1 and add_covariates: - _panel_x1 = x1 - - x2 = rng.choice([0, 1], size=n_units) if add_covariates else None - if panel and t > 1 and add_covariates: x2 = _panel_x2 # type: ignore[possibly-undefined] - elif panel and t == 1 and add_covariates: + elif not informative_sampling and panel and t == 1 and add_covariates: + _panel_x1 = x1 _panel_x2 = x2 for i in range(n_units): diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 75e249f9..03b75746 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -2517,14 +2517,14 @@ The 8-step workflow in `docs/llms-practitioner.txt` is adapted from Baker et al. ### Survey DGP (`generate_survey_did_data`) -- **Note:** The `icc` parameter calibrates `psu_re_sd` using the variance +- **Note:** The `icc` parameter calibrates `psu_re_sd` using the full variance decomposition `Var(Y) = sigma²_psu * (1 + psu_period_factor²) + sigma²_unit + sigma²_noise + sigma²_cov`. When `add_covariates=True`, the covariate variance `Var(0.5*x1) + Var(0.3*x2) = 0.2725` is included in the calibration. -- **Note:** Defensive enhancement: `informative_sampling` ranks units on structural - Y(0) (unit FE + PSU effects + time trend), excluding covariate contributions from - `add_covariates`. This models selection on structural characteristics (geography, - demographics) rather than residual variation, matching real survey sampling frames. +- **Note:** When `informative_sampling=True` and `add_covariates=True`, covariate + contributions are included in the Y(0) ranking used for weight assignment. + Covariates are pre-drawn before the ranking step (panel: once before the loop; + cross-section: each period) and reused in the outcome generation. --- diff --git a/tests/test_prep.py b/tests/test_prep.py index 3d50137d..16bb9e6c 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1678,6 +1678,40 @@ def test_icc_with_covariates(self): realized_icc = (msb - msw) / (msb + (n_bar - 1) * msw) assert abs(realized_icc - target_icc) / target_icc < 0.50 + def test_informative_sampling_with_covariates_panel(self): + """Informative sampling includes covariates in Y(0) ranking (panel).""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + add_covariates=True, + seed=42, + ) + p1 = df[df["period"] == 1] + # Positive weight-outcome correlation preserved with covariates + corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] + assert corr > 0.1 + # Covariates should be present + assert "x1" in df.columns + assert "x2" in df.columns + + def test_informative_sampling_with_covariates_cross_section(self): + """Informative sampling includes covariates in Y(0) ranking (cross-section).""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + add_covariates=True, + panel=False, + seed=42, + ) + p1 = df[df["period"] == 1] + corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] + assert corr > 0.1 + assert "x1" in df.columns + def test_heterogeneous_te_by_strata(self): """Unweighted mean TE should differ from population ATT.""" from diff_diff.prep_dgp import generate_survey_did_data From a2c235e5dccaaa2fa70dc71f00302de84e86f363 Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 16:29:03 -0400 Subject: [PATCH 06/10] Add ICC feasibility guard for zero non-PSU variance Raise ValueError when icc is set but unit_fe_sd=0, noise_sd=0, and add_covariates=False, since the target ICC is unattainable with zero non-PSU variance. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 8 +++++++- tests/test_prep.py | 9 +++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index 9eebf465..b4b02356 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1405,8 +1405,14 @@ def generate_survey_did_data( # Include covariate variance: Var(0.5*x1) + Var(0.3*x2) # where x1 ~ N(0,1), x2 ~ Bernoulli(0.5) cov_var = (0.25 + 0.09 * 0.25) if add_covariates else 0.0 + non_psu_var = unit_fe_sd**2 + noise_sd**2 + cov_var + if non_psu_var < 1e-12: + raise ValueError( + "icc requires non-zero non-PSU variance " + "(unit_fe_sd, noise_sd, or add_covariates must contribute variance)" + ) psu_re_sd = np.sqrt( - icc * (unit_fe_sd**2 + noise_sd**2 + cov_var) + icc * non_psu_var / ((1 - icc) * (1 + psu_period_factor**2)) ) diff --git a/tests/test_prep.py b/tests/test_prep.py index 16bb9e6c..211e13f4 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1531,6 +1531,15 @@ def test_icc_parameter(self): realized_icc = (msb - msw) / (msb + (n_bar - 1) * msw) assert abs(realized_icc - target_icc) / target_icc < 0.50 + def test_icc_zero_variance_rejected(self): + """icc with zero non-PSU variance should raise ValueError.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="non-zero non-PSU variance"): + generate_survey_did_data( + icc=0.3, unit_fe_sd=0, noise_sd=0, add_covariates=False, seed=42 + ) + def test_icc_and_psu_re_sd_conflict(self): """Cannot specify both icc and a non-default psu_re_sd.""" from diff_diff.prep_dgp import generate_survey_did_data From 8be6fad354c480a6620a7663eda0b314eed6764a Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 16:41:24 -0400 Subject: [PATCH 07/10] Return NaN for undefined DGP truth diagnostics - population_att returns NaN when no treated units (was 0.0) - icc_realized returns NaN when period-1 has < 2 groups or no within-PSU replication (was finite sentinel) - Matches library NaN-for-undefined convention Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 22 +++++++++++++--------- tests/test_prep.py | 26 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index b4b02356..eb6b5bee 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1637,7 +1637,7 @@ def generate_survey_did_data( te_treated = df.loc[treated_mask, "true_effect"].values population_att = float(np.average(te_treated, weights=w_treated)) else: - population_att = 0.0 + population_att = float("nan") if te_by_stratum is not None: stratum_effects = { @@ -1658,14 +1658,18 @@ def generate_survey_did_data( _groups = _p1.groupby("psu")["outcome"] _n_total = len(_p1) _n_groups = _groups.ngroups - _n_bar = _n_total / _n_groups - _grand_mean = _p1["outcome"].mean() - _ssb = (_groups.size() * (_groups.mean() - _grand_mean) ** 2).sum() - _msb = _ssb / max(_n_groups - 1, 1) - _ssw = _groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() - _msw = _ssw / max(_n_total - _n_groups, 1) - _denom = _msb + (_n_bar - 1) * _msw - icc_realized = float((_msb - _msw) / _denom) if _denom > 0 else 0.0 + # ICC undefined with < 2 groups or no within-group replication + if _n_groups < 2 or _n_total <= _n_groups: + icc_realized = float("nan") + else: + _n_bar = _n_total / _n_groups + _grand_mean = _p1["outcome"].mean() + _ssb = (_groups.size() * (_groups.mean() - _grand_mean) ** 2).sum() + _msb = _ssb / (_n_groups - 1) + _ssw = _groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() + _msw = _ssw / (_n_total - _n_groups) + _denom = _msb + (_n_bar - 1) * _msw + icc_realized = float((_msb - _msw) / _denom) if _denom > 0 else float("nan") df.attrs["dgp_truth"] = { "population_att": population_att, diff --git a/tests/test_prep.py b/tests/test_prep.py index 211e13f4..9414fa6d 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1774,6 +1774,32 @@ def test_return_true_population_att(self): # icc_realized should track the target ICC (ANOVA-based, same formula) assert abs(truth["icc_realized"] - 0.3) / 0.3 < 0.50 + def test_population_att_nan_no_treated(self): + """population_att should be NaN when there are no treated units.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=50, + never_treated_frac=1.0, + return_true_population_att=True, + seed=42, + ) + assert np.isnan(df.attrs["dgp_truth"]["population_att"]) + + def test_icc_realized_nan_no_replication(self): + """icc_realized should be NaN when period-1 has no within-PSU replication.""" + from diff_diff.prep_dgp import generate_survey_did_data + + # 5 units across 5 strata with 8 PSUs each = 1 unit per PSU (no replication) + df = generate_survey_did_data( + n_units=5, + n_strata=5, + psu_per_stratum=8, + return_true_population_att=True, + seed=42, + ) + assert np.isnan(df.attrs["dgp_truth"]["icc_realized"]) + def test_strata_sizes(self): """Custom strata_sizes should produce correct per-stratum counts.""" from diff_diff.prep_dgp import generate_survey_did_data From 3f065976e0e90410e6570e1c7b01469e74c649d8 Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 17:11:33 -0400 Subject: [PATCH 08/10] Add covariate_effects and te_covariate_interaction parameters - covariate_effects=(beta1, beta2) replaces hardcoded (0.5, 0.3) coefficients for x1/x2 in outcome, ICC calibration, and informative sampling ranking - te_covariate_interaction adds TE_i = base_TE + gamma * x1_i for unit-level treatment effect heterogeneity (requires add_covariates) - Both default to current behavior for backward compatibility Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 32 ++++++++++++++++++++++----- tests/test_prep.py | 51 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index eb6b5bee..9cc4b00a 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1187,6 +1187,8 @@ def generate_survey_did_data( heterogeneous_te_by_strata: bool = False, strata_sizes: Optional[List[int]] = None, return_true_population_att: bool = False, + covariate_effects: Optional[tuple] = None, + te_covariate_interaction: float = 0.0, ) -> pd.DataFrame: """ Generate synthetic staggered DiD data with survey structure. @@ -1288,6 +1290,16 @@ def generate_survey_did_data( true effects), ``deff_kish`` (1 + CV(w)^2), ``stratum_effects`` (dict mapping stratum index to TE), ``icc_realized`` (ANOVA-based ICC computed on period-1 data). + covariate_effects : tuple of (float, float), optional + Coefficients ``(beta1, beta2)`` for covariates x1 and x2 in the + outcome equation ``y += beta1 * x1 + beta2 * x2``. Default uses + ``(0.5, 0.3)``. Only used when ``add_covariates=True``. The ICC + calibration automatically adjusts for the implied covariate variance. + te_covariate_interaction : float, default=0.0 + Coefficient for treatment-by-covariate interaction: + ``TE_i = base_TE + te_covariate_interaction * x1_i``. Creates + unit-level treatment effect heterogeneity driven by the continuous + covariate. Requires ``add_covariates=True``. Returns ------- @@ -1400,11 +1412,19 @@ def generate_survey_did_data( f"got {sum(strata_sizes)}" ) + # --- Resolve covariate coefficients --- + _beta1, _beta2 = covariate_effects if covariate_effects is not None else (0.5, 0.3) + + if te_covariate_interaction != 0.0 and not add_covariates: + raise ValueError( + "te_covariate_interaction requires add_covariates=True" + ) + # --- ICC -> psu_re_sd resolution --- if icc is not None: - # Include covariate variance: Var(0.5*x1) + Var(0.3*x2) + # Covariate variance: Var(beta1*x1) + Var(beta2*x2) # where x1 ~ N(0,1), x2 ~ Bernoulli(0.5) - cov_var = (0.25 + 0.09 * 0.25) if add_covariates else 0.0 + cov_var = (_beta1**2 * 1.0 + _beta2**2 * 0.25) if add_covariates else 0.0 non_psu_var = unit_fe_sd**2 + noise_sd**2 + cov_var if non_psu_var < 1e-12: raise ValueError( @@ -1499,7 +1519,7 @@ def generate_survey_did_data( if add_covariates: _panel_x1 = rng.normal(0, 1, size=n_units) _panel_x2 = rng.choice([0, 1], size=n_units) - y0_period1 = y0_period1 + 0.5 * _panel_x1 + 0.3 * _panel_x2 + y0_period1 = y0_period1 + _beta1 * _panel_x1 + _beta2 * _panel_x2 _rank_pair_weights(unit_weight, unit_stratum, y0_period1, n_strata) # Save base weights for cross-section informative sampling (reset each period) @@ -1546,7 +1566,7 @@ def generate_survey_did_data( + 0.5 * t ) if add_covariates: - y0_t = y0_t + 0.5 * x1 + 0.3 * x2 + y0_t = y0_t + _beta1 * x1 + _beta2 * x2 _rank_pair_weights(unit_weight, unit_stratum, y0_t, n_strata) # Covariates — may already be drawn by informative sampling above @@ -1574,7 +1594,7 @@ def generate_survey_did_data( y = unit_fe[i] + psu_re[unit_psu[i]] + psu_period_re[unit_psu[i], t - 1] + 0.5 * t if add_covariates: - y += 0.5 * x1[i] + 0.3 * x2[i] + y += _beta1 * x1[i] + _beta2 * x2[i] treated = int(g_i > 0 and t >= g_i) true_eff = 0.0 @@ -1583,6 +1603,8 @@ def generate_survey_did_data( true_eff = float(te_by_stratum[unit_stratum[i]]) else: true_eff = treatment_effect + if te_covariate_interaction != 0.0: + true_eff += te_covariate_interaction * x1[i] if dynamic_effects: true_eff *= 1 + effect_growth * (t - g_i) y += true_eff diff --git a/tests/test_prep.py b/tests/test_prep.py index 9414fa6d..4fff2668 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1837,3 +1837,54 @@ def test_backward_compatibility(self): df1 = generate_survey_did_data(seed=123) df2 = generate_survey_did_data(seed=123) pd.testing.assert_frame_equal(df1, df2) + + def test_covariate_effects_custom(self): + """Custom covariate coefficients should change outcome variance.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df_default = generate_survey_did_data( + n_units=500, add_covariates=True, seed=42 + ) + df_large = generate_survey_did_data( + n_units=500, add_covariates=True, + covariate_effects=(2.0, 1.0), seed=42, + ) + # Larger coefficients → larger outcome variance + assert df_large["outcome"].var() > df_default["outcome"].var() + + def test_covariate_effects_zero(self): + """Zero covariate effects should produce same variance as no covariates.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df_no_cov = generate_survey_did_data( + n_units=500, add_covariates=False, seed=42 + ) + df_zero = generate_survey_did_data( + n_units=500, add_covariates=True, + covariate_effects=(0.0, 0.0), seed=42, + ) + # Outcome variance should be similar (covariates contribute nothing) + assert abs(df_zero["outcome"].var() - df_no_cov["outcome"].var()) < 0.5 + + def test_te_covariate_interaction(self): + """Covariate interaction should create unit-level TE heterogeneity.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=500, + add_covariates=True, + te_covariate_interaction=1.0, + seed=42, + ) + treated = df[df["treated"] == 1] + # true_effect should vary across treated units (not constant) + assert treated["true_effect"].std() > 0.1 + + def test_te_covariate_interaction_requires_covariates(self): + """te_covariate_interaction without add_covariates should raise.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="te_covariate_interaction requires"): + generate_survey_did_data( + te_covariate_interaction=0.5, add_covariates=False, seed=42 + ) From 21c25f10eb1c251eeaa89568184a71ce8de620e6 Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 17:37:47 -0400 Subject: [PATCH 09/10] Address AI review: validate new params, rename stratum_effects - Add type/length/finiteness validation for covariate_effects and te_covariate_interaction - Rename stratum_effects to base_stratum_effects in dgp_truth to clarify these are base TEs before dynamic/covariate modifiers - Add validation rejection tests for malformed inputs Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 21 ++++++++++++++++++--- tests/test_prep.py | 28 +++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index 9cc4b00a..2edf45bf 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1288,7 +1288,8 @@ def generate_survey_did_data( If True, attaches a diagnostic dict to ``df.attrs["dgp_truth"]`` with keys: ``population_att`` (weight-weighted average of treated true effects), ``deff_kish`` (1 + CV(w)^2), ``stratum_effects`` - (dict mapping stratum index to TE), ``icc_realized`` (ANOVA-based + (base stratum TEs before dynamic/covariate modifiers), + ``icc_realized`` (ANOVA-based ICC computed on period-1 data). covariate_effects : tuple of (float, float), optional Coefficients ``(beta1, beta2)`` for covariates x1 and x2 in the @@ -1412,9 +1413,23 @@ def generate_survey_did_data( f"got {sum(strata_sizes)}" ) - # --- Resolve covariate coefficients --- + # --- Validate and resolve covariate coefficients --- + if covariate_effects is not None: + covariate_effects = tuple(covariate_effects) + if len(covariate_effects) != 2: + raise ValueError( + f"covariate_effects must have length 2, got {len(covariate_effects)}" + ) + if not all(np.isfinite(c) for c in covariate_effects): + raise ValueError( + f"covariate_effects must be finite, got {covariate_effects}" + ) _beta1, _beta2 = covariate_effects if covariate_effects is not None else (0.5, 0.3) + if not np.isfinite(te_covariate_interaction): + raise ValueError( + f"te_covariate_interaction must be finite, got {te_covariate_interaction}" + ) if te_covariate_interaction != 0.0 and not add_covariates: raise ValueError( "te_covariate_interaction requires add_covariates=True" @@ -1696,7 +1711,7 @@ def generate_survey_did_data( df.attrs["dgp_truth"] = { "population_att": population_att, "deff_kish": float(deff_kish), - "stratum_effects": stratum_effects, + "base_stratum_effects": stratum_effects, "icc_realized": icc_realized, } diff --git a/tests/test_prep.py b/tests/test_prep.py index 4fff2668..fb26c5f7 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1767,7 +1767,7 @@ def test_return_true_population_att(self): truth = df.attrs["dgp_truth"] assert "population_att" in truth assert "deff_kish" in truth - assert "stratum_effects" in truth + assert "base_stratum_effects" in truth assert "icc_realized" in truth assert truth["deff_kish"] >= 1.0 assert truth["icc_realized"] >= 0.0 @@ -1888,3 +1888,29 @@ def test_te_covariate_interaction_requires_covariates(self): generate_survey_did_data( te_covariate_interaction=0.5, add_covariates=False, seed=42 ) + + def test_covariate_effects_validation(self): + """covariate_effects must be length 2 and finite.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="covariate_effects must have length 2"): + generate_survey_did_data( + add_covariates=True, covariate_effects=(1.0,), seed=42 + ) + with pytest.raises(ValueError, match="covariate_effects must be finite"): + generate_survey_did_data( + add_covariates=True, covariate_effects=(np.nan, 0.3), seed=42 + ) + with pytest.raises(ValueError, match="covariate_effects must be finite"): + generate_survey_did_data( + add_covariates=True, covariate_effects=(0.5, np.inf), seed=42 + ) + + def test_te_covariate_interaction_validation(self): + """te_covariate_interaction must be finite.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="te_covariate_interaction must be finite"): + generate_survey_did_data( + add_covariates=True, te_covariate_interaction=np.nan, seed=42 + ) From bdfb172f581d3e7105ac3c2101b9ad3ec2ccbd4b Mon Sep 17 00:00:00 2001 From: igerber Date: Sun, 5 Apr 2026 17:54:55 -0400 Subject: [PATCH 10/10] Address P3 documentation drift: REGISTRY formula, docstring key, ranking test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - REGISTRY.md: generalize ICC covariate variance to beta1²/beta2² formula - Docstring: fix stratum_effects → base_stratum_effects key name - Add direct covariate-ranking test: covariates dominate Y(0), verify weight assignment changes with nonzero vs zero covariate effects Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep_dgp.py | 2 +- docs/methodology/REGISTRY.md | 6 ++++-- tests/test_prep.py | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index 2edf45bf..546a5a25 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1287,7 +1287,7 @@ def generate_survey_did_data( return_true_population_att : bool, default=False If True, attaches a diagnostic dict to ``df.attrs["dgp_truth"]`` with keys: ``population_att`` (weight-weighted average of treated - true effects), ``deff_kish`` (1 + CV(w)^2), ``stratum_effects`` + true effects), ``deff_kish`` (1 + CV(w)^2), ``base_stratum_effects`` (base stratum TEs before dynamic/covariate modifiers), ``icc_realized`` (ANOVA-based ICC computed on period-1 data). diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 03b75746..26c311a1 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -2519,8 +2519,10 @@ The 8-step workflow in `docs/llms-practitioner.txt` is adapted from Baker et al. - **Note:** The `icc` parameter calibrates `psu_re_sd` using the full variance decomposition `Var(Y) = sigma²_psu * (1 + psu_period_factor²) + sigma²_unit + - sigma²_noise + sigma²_cov`. When `add_covariates=True`, the covariate variance - `Var(0.5*x1) + Var(0.3*x2) = 0.2725` is included in the calibration. + sigma²_noise + sigma²_cov`. When `add_covariates=True`, covariate variance + `sigma²_cov = beta1² * Var(x1) + beta2² * Var(x2)` is included, where + `(beta1, beta2)` defaults to `(0.5, 0.3)` but is configurable via + `covariate_effects`. - **Note:** When `informative_sampling=True` and `add_covariates=True`, covariate contributions are included in the Y(0) ranking used for weight assignment. Covariates are pre-drawn before the ranking step (panel: once before the loop; diff --git a/tests/test_prep.py b/tests/test_prep.py index fb26c5f7..0674186a 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1721,6 +1721,41 @@ def test_informative_sampling_with_covariates_cross_section(self): assert corr > 0.1 assert "x1" in df.columns + def test_informative_sampling_covariate_ranking_direct(self): + """Verify covariates actually affect weight assignment in ranking. + + Use large covariate effects with tiny unit_fe_sd/psu_re_sd so + covariates dominate Y(0). Weights with nonzero vs zero covariate + effects should differ. + """ + from diff_diff.prep_dgp import generate_survey_did_data + + # Covariates dominate: large beta, tiny structural variance + df_with = generate_survey_did_data( + n_units=200, + informative_sampling=True, + add_covariates=True, + covariate_effects=(5.0, 0.0), + unit_fe_sd=0.01, + psu_re_sd=0.01, + noise_sd=0.01, + seed=42, + ) + df_without = generate_survey_did_data( + n_units=200, + informative_sampling=True, + add_covariates=True, + covariate_effects=(0.0, 0.0), + unit_fe_sd=0.01, + psu_re_sd=0.01, + noise_sd=0.01, + seed=42, + ) + # Weight assignments should differ when covariates dominate ranking + w_with = df_with[df_with["period"] == 1]["weight"].values + w_without = df_without[df_without["period"] == 1]["weight"].values + assert not np.allclose(w_with, w_without, atol=0.01) + def test_heterogeneous_te_by_strata(self): """Unweighted mean TE should differ from population ATT.""" from diff_diff.prep_dgp import generate_survey_did_data