Skip to content

Commit b9985e9

Browse files
author
miranov25
committed
Phase 13.9.GB Extensions: agg_columns, WLS fix, fit_intercept fix, lean output
Feature: agg_columns parameter for COG/window statistics - mean/std of user-specified columns within SW window - All 3 paths: zerocopy, V3 incremental, V5 numba - Canonical pattern: agg_columns = gb_columns + linear_columns Bug #1 (P0): weights silently ignored in regression — fixed Bug AliceO2Group#2 (P0): fit_intercept=False produced intercept columns — fixed Interface: removed default fit_column stats (architect decision) - 52 → ~28 columns for 4-target fits - Use agg_columns to opt-in Tests: 15 new, 6 files updated 462 passed, 3 failed (pre-existing), 19 skipped, 0 regressions Reviewed-by: Claude14, GPT7, GPT10, Claude (Opus)
1 parent 17f266f commit b9985e9

7 files changed

Lines changed: 130 additions & 123 deletions

UTILS/dfextensions/groupby_regression/groupby_regression_sliding_window.py

Lines changed: 61 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -500,29 +500,7 @@ def _aggregate_window_zerocopy(
500500
w_valid = None
501501

502502
for t in fit_columns:
503-
y = target_arrays[t][idx_unique]
504-
y_finite = np.isfinite(y)
505-
506-
if weights is None:
507-
x = y[y_finite]
508-
mean, std = _weighted_mean_std(x, None)
509-
else:
510-
valid = y_finite & w_valid
511-
x = y[valid]
512-
ww = w_win[valid]
513-
mean, std = _weighted_mean_std(x, ww)
514-
515-
n_finite = int(np.sum(y_finite))
516-
if n_finite > 0:
517-
median = float(np.median(y[y_finite]))
518-
else:
519-
median = np.nan
520-
stats[t] = {
521-
"mean": mean,
522-
"std": std,
523-
"median": median,
524-
"entries": n_finite,
525-
}
503+
stats[t] = {} # no per-target stats — use agg_columns if needed
526504

527505
# Aggregate extra columns (COG etc.)
528506
if _agg_cols:
@@ -548,7 +526,7 @@ def _aggregate_window_zerocopy(
548526
agg_st[c] = {"mean": mean, "std": std, "median": median}
549527
else:
550528
for t in fit_columns:
551-
stats[t] = {"mean": np.nan, "std": np.nan, "median": np.nan, "entries": 0}
529+
stats[t] = {}
552530
if _agg_cols:
553531
agg_st = {c: {"mean": np.nan, "std": np.nan, "median": np.nan} for c in _agg_cols}
554532

@@ -2421,20 +2399,15 @@ def _assemble_results_v5(
24212399

24222400
pred_names = [_sanitize_suffix(p) for p in linear_columns]
24232401

2424-
# Per-target columns
2402+
# Per-target columns — fit results only (no mean/std/median/entries/r_squared)
24252403
for tgt in fit_columns:
24262404
s = suffix
2427-
data[f'{tgt}_mean{s}'] = v5_arrays[f'{tgt}_mean']
2428-
data[f'{tgt}_std{s}'] = v5_arrays[f'{tgt}_std']
2429-
data[f'{tgt}_median{s}'] = np.full(n_bins, np.nan) # Cannot compute from sufficient stats
2430-
data[f'{tgt}_entries{s}'] = v5_arrays[f'{tgt}_entries']
24312405
if fit_intercept:
24322406
data[f'{tgt}_intercept{s}'] = v5_arrays[f'{tgt}_intercept']
24332407
data[f'{tgt}_intercept_err{s}'] = v5_arrays[f'{tgt}_intercept_err']
24342408
for pname in pred_names:
24352409
data[f'{tgt}_slope_{pname}{s}'] = v5_arrays[f'{tgt}_slope_{pname}']
24362410
data[f'{tgt}_slope_{pname}_err{s}'] = v5_arrays[f'{tgt}_slope_{pname}_err']
2437-
data[f'{tgt}_r_squared{s}'] = v5_arrays[f'{tgt}_r_squared']
24382411
data[f'{tgt}_rmse{s}'] = v5_arrays[f'{tgt}_rmse']
24392412
data[f'{tgt}_n_fitted{s}'] = v5_arrays[f'{tgt}_n_fitted']
24402413

@@ -2489,12 +2462,8 @@ def _assemble_results(
24892462
if agg_median:
24902463
base[f"{c}_median"] = np.nan
24912464

2492-
# Aggregate stats (fit_columns)
2493-
for t, st in ar.stats.items():
2494-
base[f"{t}_mean"] = st["mean"]
2495-
base[f"{t}_std"] = st["std"]
2496-
base[f"{t}_median"] = st["median"]
2497-
base[f"{t}_entries"] = st["entries"]
2465+
# fit_columns stats (mean/std/median/entries) NOT emitted by default.
2466+
# Use agg_columns to opt-in if needed.
24982467

24992468
# Fit outputs
25002469
fit_map = fit_results.get(ar.center, {})
@@ -2514,7 +2483,6 @@ def _assemble_results(
25142483
for p, ps in pred_suffixes.items():
25152484
base[f"{t}_slope_{ps}"] = np.nan
25162485
base[f"{t}_slope_{ps}_err"] = np.nan
2517-
base[f"{t}_r_squared"] = np.nan
25182486
base[f"{t}_rmse"] = np.nan
25192487
base[f"{t}_n_fitted"] = 0
25202488
continue
@@ -2525,7 +2493,6 @@ def _assemble_results(
25252493
for p, ps in pred_suffixes.items():
25262494
base[f"{t}_slope_{ps}"] = tres.get("coeffs", {}).get(p, np.nan)
25272495
base[f"{t}_slope_{ps}_err"] = tres.get("coeffs_err", {}).get(p, np.nan)
2528-
base[f"{t}_r_squared"] = tres.get("r_squared", np.nan)
25292496
base[f"{t}_rmse"] = tres.get("rmse", np.nan)
25302497
base[f"{t}_n_fitted"] = tres.get("n_fitted", 0)
25312498
if tres.get("quality_flag"):
@@ -2543,17 +2510,15 @@ def _assemble_results(
25432510
if dim not in out.columns:
25442511
out[dim] = pd.Series(dtype="int64")
25452512

2546-
# Order columns: gb_columns -> agg_columns stats -> fit aggregations -> fit outputs -> diagnostics
2513+
# Order columns: gb_columns -> agg_columns stats -> fit outputs -> diagnostics
25472514
extra_agg_cols = []
25482515
for c in _agg_cols:
25492516
extra_agg_cols.append(f"{c}_mean")
25502517
extra_agg_cols.append(f"{c}_std")
25512518
if agg_median:
25522519
extra_agg_cols.append(f"{c}_median")
25532520

2554-
agg_cols = [c for c in out.columns if any(c.startswith(f"{t}_") for t in fit_columns) and (
2555-
c.endswith("_mean") or c.endswith("_std") or c.endswith("_median") or c.endswith("_entries")
2556-
)]
2521+
agg_cols = [] # no fit_column stats in default output
25572522

25582523
fit_cols = []
25592524
for t in fit_columns:
@@ -2563,7 +2528,6 @@ def _assemble_results(
25632528
for p, ps in pred_suffixes.items():
25642529
fit_cols.append(f"{t}_slope_{ps}")
25652530
fit_cols.append(f"{t}_slope_{ps}_err")
2566-
fit_cols.append(f"{t}_r_squared")
25672531
fit_cols.append(f"{t}_rmse")
25682532
fit_cols.append(f"{t}_n_fitted")
25692533

@@ -2790,11 +2754,15 @@ def make_sliding_window_fit(
27902754
within each sliding window. Useful for computing center-of-gravity
27912755
of groupby variables or predictor columns.
27922756
2793-
Example: To get the data COG for the groupby coordinates::
2757+
Recommended: COG of groupby + predictor columns::
27942758
2795-
agg_columns = ['mpt', 'vertex_z', 'tgl', 'phi']
2759+
agg_columns = gb_columns + linear_columns
27962760
2797-
Output: ``mpt_mean_sw``, ``mpt_std_sw``, ``vertex_z_mean_sw``, etc.
2761+
To also get fit_column statistics (mean/std), add them explicitly::
2762+
2763+
agg_columns = gb_columns + linear_columns + fit_columns
2764+
2765+
Output: ``{col}_mean_sw``, ``{col}_std_sw`` for each agg_column.
27982766
27992767
When kernel is non-uniform (e.g. ``kernel='gaussian'``), mean and
28002768
std are kernel-weighted. Median (if enabled) is always unweighted.
@@ -2923,46 +2891,68 @@ def make_sliding_window_fit(
29232891
if agg_columns:
29242892
_agg_cols_v5 = agg_columns
29252893
_agg_arrays_v5 = {c: df[c].to_numpy(dtype=np.float64) for c in _agg_cols_v5}
2926-
# Build per-bin row lists from bin_ids
2894+
n_dims = len(gb_columns)
2895+
2896+
# Build per-bin row lists from bin_ids — O(n_rows) once
29272897
_bin_rows_v5: Dict[int, np.ndarray] = {}
29282898
for bi in range(_n_bins):
29292899
_bin_rows_v5[bi] = np.where(bin_ids == bi)[0]
29302900

2931-
# For each center bin, aggregate agg_columns over its window
2901+
# O(1) coord→bin_index lookup
2902+
coord_to_bin: Dict[Tuple[int, ...], int] = {
2903+
tuple(int(_bin_coords[i, d]) for d in range(n_dims)): i
2904+
for i in range(_n_bins)
2905+
}
2906+
2907+
# Pre-allocate output arrays
2908+
_agg_out: Dict[str, np.ndarray] = {}
2909+
for c in _agg_cols_v5:
2910+
_agg_out[f'{c}_mean'] = np.full(_n_bins, np.nan, dtype=np.float64)
2911+
_agg_out[f'{c}_std'] = np.full(_n_bins, np.nan, dtype=np.float64)
2912+
if agg_median:
2913+
_agg_out[f'{c}_median'] = np.full(_n_bins, np.nan, dtype=np.float64)
2914+
29322915
for bi in range(_n_bins):
2933-
# Collect row indices from all neighbors
2934-
center_coord = tuple(int(_bin_coords[bi, d]) for d in range(len(gb_columns)))
2935-
nbr_bins_v5 = _get_neighbor_bins_v2(
2936-
center_coord, neighbor_offsets, bounds, gb_columns,
2916+
center_coord = tuple(int(_bin_coords[bi, d]) for d in range(n_dims))
2917+
nbr_coords, valid_oi = _get_neighbor_bins_v2(
2918+
center_coord, neighbor_offsets, bounds,
29372919
boundary_resolved, full_window_spec)
2920+
29382921
idx_list_v5: List[int] = []
2939-
for nb_coord in nbr_bins_v5:
2940-
# Find compact bin index for this neighbor coordinate
2941-
# Use bin_coords to map back
2942-
for bj in range(_n_bins):
2943-
if tuple(int(_bin_coords[bj, d]) for d in range(len(gb_columns))) == nb_coord:
2944-
idx_list_v5.extend(_bin_rows_v5[bj].tolist())
2945-
break
2922+
kw_list_v5: List[np.ndarray] = []
2923+
for ni, nb_coord in enumerate(nbr_coords):
2924+
bj = coord_to_bin.get(nb_coord)
2925+
if bj is not None and bj in _bin_rows_v5:
2926+
rows_j = _bin_rows_v5[bj]
2927+
idx_list_v5.extend(rows_j.tolist())
2928+
kw = float(offset_weights[valid_oi[ni]])
2929+
kw_list_v5.append(np.full(len(rows_j), kw, dtype=np.float64))
29462930

29472931
if not idx_list_v5:
2948-
for c in _agg_cols_v5:
2949-
out.loc[out.index[bi], f'{c}_mean{suffix}'] = np.nan
2950-
out.loc[out.index[bi], f'{c}_std{suffix}'] = np.nan
2951-
if agg_median:
2952-
out.loc[out.index[bi], f'{c}_median{suffix}'] = np.nan
29532932
continue
29542933

2955-
idx_v5 = np.unique(np.array(idx_list_v5, dtype=np.int64))
2934+
idx_v5 = np.array(idx_list_v5, dtype=np.int64)
2935+
kw_v5 = np.concatenate(kw_list_v5) if _is_weighted_kernel else None
2936+
29562937
for c in _agg_cols_v5:
29572938
y = _agg_arrays_v5[c][idx_v5]
29582939
y_fin = np.isfinite(y)
2959-
x = y[y_fin]
2960-
mean, std = _weighted_mean_std(x, None)
2961-
out.loc[out.index[bi], f'{c}_mean{suffix}'] = mean
2962-
out.loc[out.index[bi], f'{c}_std{suffix}'] = std
2940+
if _is_weighted_kernel and kw_v5 is not None:
2941+
x = y[y_fin]
2942+
ww = kw_v5[y_fin]
2943+
mean, std = _weighted_mean_std(x, ww)
2944+
else:
2945+
x = y[y_fin]
2946+
mean, std = _weighted_mean_std(x, None)
2947+
2948+
_agg_out[f'{c}_mean'][bi] = mean
2949+
_agg_out[f'{c}_std'][bi] = std
29632950
if agg_median:
2964-
median = float(np.median(x)) if len(x) > 0 else np.nan
2965-
out.loc[out.index[bi], f'{c}_median{suffix}'] = median
2951+
_agg_out[f'{c}_median'][bi] = float(np.median(x)) if len(x) > 0 else np.nan
2952+
2953+
# Assign columns to DataFrame at once
2954+
for key, arr in _agg_out.items():
2955+
out[f'{key}{suffix}'] = arr
29662956

29672957
if verbose:
29682958
print(f"[V5] Assembly: {time.time()-t_asm:.4f}s")

UTILS/dfextensions/groupby_regression/tests/test_agg_columns.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,3 +247,35 @@ def test_agg_columns_v5_matches_zerocopy(sample_df):
247247
merged[f'{col}_std_sw_inc'].values,
248248
rtol=1e-10, atol=1e-12,
249249
err_msg=f"{col}_std differs between recompute and incremental")
250+
251+
252+
# ── Test 7: Default output has no fit_column stats ──
253+
254+
def test_default_no_fit_stats(sample_df):
255+
"""Default output has no {t}_mean, {t}_std, {t}_median, {t}_entries, {t}_r_squared."""
256+
result = make_sliding_window_fit(**_base_kwargs(sample_df))
257+
258+
for col in result.columns:
259+
assert not col.endswith('_mean_sw'), f"Unexpected fit stat column: {col}"
260+
assert not col.endswith('_std_sw'), f"Unexpected fit stat column: {col}"
261+
assert not col.endswith('_median_sw'), f"Unexpected fit stat column: {col}"
262+
assert not col.endswith('_entries_sw'), f"Unexpected fit stat column: {col}"
263+
assert '_r_squared_' not in col, f"Unexpected r_squared column: {col}"
264+
265+
# Fit results should still be present
266+
assert 'target_intercept_sw' in result.columns
267+
assert 'target_slope_predictor_sw' in result.columns
268+
assert 'target_rmse_sw' in result.columns
269+
assert 'target_n_fitted_sw' in result.columns
270+
271+
272+
# ── Test 8: agg_columns restores fit_column stats ──
273+
274+
def test_agg_columns_restores_fit_stats(sample_df):
275+
"""Adding fit_column to agg_columns produces mean/std for that column."""
276+
result = make_sliding_window_fit(
277+
**_base_kwargs(sample_df, agg_columns=['target']))
278+
279+
assert 'target_mean_sw' in result.columns
280+
assert 'target_std_sw' in result.columns
281+
assert result['target_mean_sw'].notna().all()

UTILS/dfextensions/groupby_regression/tests/test_groupby_regression_sliding_window.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -173,14 +173,14 @@ def test_sliding_window_basic_3d_verbose():
173173
fit_columns=['value'],
174174
linear_columns=['x'],
175175
min_stat=10
176-
, suffix='')
176+
, suffix='', agg_columns=['value'])
177177

178178
assert isinstance(result, pd.DataFrame), "Result must be a DataFrame."
179179
assert {'xBin', 'yBin', 'zBin'}.issubset(result.columns), "Missing group columns."
180-
assert {'value_mean', 'value_std', 'value_entries'}.issubset(result.columns), "Missing aggregation outputs."
180+
assert {'value_mean', 'value_std'}.issubset(result.columns), "Missing aggregation outputs."
181181

182182
# Regression: ensure at least basic coefficients are present
183-
expect_any = {'value_slope_x', 'value_intercept', 'value_r_squared'}
183+
expect_any = {'value_slope_x', 'value_intercept'}
184184
assert any(c in result.columns for c in expect_any), "Missing regression outputs."
185185

186186
# Metadata presence (canonical keys)
@@ -213,12 +213,11 @@ def test_sliding_window_aggregation_verbose():
213213
fit_columns=['value'],
214214
linear_columns=[],
215215
min_stat=1
216-
, suffix='')
216+
, suffix='', agg_columns=['value'])
217217

218218
row_0 = result[(result['xBin'] == 0) & (result['yBin'] == 0) & (result['zBin'] == 0)].iloc[0]
219-
assert row_0['value_entries'] == 6, "Entries must include neighbors in x."
219+
assert row_0['n_rows_aggregated'] == 6, "Entries must include neighbors in x."
220220
assert np.isclose(row_0['value_mean'], 3.5, atol=1e-6), "Mean mismatch."
221-
assert np.isclose(row_0.get('value_median', 3.5), 3.5, atol=1e-6), "Median mismatch."
222221

223222

224223
def test_sliding_window_linear_fit_recover_slope():
@@ -531,13 +530,11 @@ def test_multi_target_fit_output_schema():
531530
df=df, gb_columns=['xBin', 'yBin', 'zBin'],
532531
window_spec={'xBin': 1, 'yBin': 1, 'zBin': 1},
533532
fit_columns=['value', 'value2'], linear_columns=['x'], min_stat=10
534-
, suffix='')
533+
, suffix='', agg_columns=['value'])
535534

536535
expected = [
537-
'value_mean', 'value_std', 'value_median', 'value_entries',
538-
'value_slope_x', 'value_intercept', 'value_r_squared',
539-
'value2_mean', 'value2_std', 'value2_median', 'value2_entries',
540-
'value2_slope_x', 'value2_intercept', 'value2_r_squared'
536+
'value_slope_x', 'value_intercept',
537+
'value2_slope_x', 'value2_intercept'
541538
]
542539
for c in expected:
543540
assert c in result.columns, f"Missing column: {c}"
@@ -578,7 +575,7 @@ def test_selection_mask_filters_pre_windowing():
578575
fit_columns=['value'], linear_columns=['x'], selection=selection
579576
, suffix='')
580577

581-
assert res_sel['value_entries'].mean() < res_all['value_entries'].mean(), \
578+
assert res_sel['n_rows_aggregated'].mean() < res_all['n_rows_aggregated'].mean(), \
582579
"Selected run must show fewer entries per bin on average."
583580

584581

@@ -771,7 +768,7 @@ def test_realistic_smoke_normalised_residuals_gate():
771768
df=df, gb_columns=['xBin', 'y2xBin', 'z2xBin'],
772769
window_spec={'xBin': 1, 'y2xBin': 1, 'z2xBin': 1},
773770
fit_columns=['value'], linear_columns=['meanIDC'], min_stat=10
774-
, suffix='')
771+
, suffix='', agg_columns=['value'])
775772

776773
# We cannot assert exact counts, but we can assert existence of entries
777774
# and that residual-related outputs (e.g., value_std) are finite.

0 commit comments

Comments
 (0)