Skip to content

Commit 917c361

Browse files
committed
Fix parallel SW: forkserver context + raise on total failure
Bug: Numba NUMBA_NUM_THREADS conflict caused silent empty DataFrame on multi-core servers. fork() inherited parent Numba thread pool. Fix: forkserver/spawn multiprocessing context (clean child processes). - forkserver → spawn → fork (fallback chain) - spawn: data pickled per-unit via _worker_v5 - fork: shared memory via _worker_v5_shared (unchanged) Safety: RuntimeError if ALL parallel units fail. Always warn if any units fail (was verbose>=2 only). Tested: 256-core GSI server, 18 sectors, 2.7M rows, n_workers=16.
1 parent b9985e9 commit 917c361

1 file changed

Lines changed: 41 additions & 5 deletions

File tree

UTILS/dfextensions/groupby_regression/groupby_regression_sliding_window.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3229,12 +3229,22 @@ def _csort(keys, n_groups):
32293229

32303230

32313231
def _worker_init():
3232-
"""Set NUMBA_NUM_THREADS=1 inside worker to prevent oversubscription."""
3232+
"""Set NUMBA_NUM_THREADS=1 inside worker to prevent oversubscription.
3233+
3234+
Must set env var BEFORE importing numba, as numba reads it at import time.
3235+
Handles the case where Numba is already imported (in-process n_workers=1).
3236+
"""
3237+
# Set env var first — affects any future numba import in child processes
32333238
os.environ['NUMBA_NUM_THREADS'] = '1'
3239+
os.environ['NUMBA_THREADING_LAYER'] = 'workqueue'
32343240
try:
32353241
import numba
3236-
numba.config.THREADING_LAYER_PRIORITY = ['workqueue']
3237-
except Exception:
3242+
# If numba already imported, try to adjust thread count
3243+
try:
3244+
numba.set_num_threads(1)
3245+
except Exception:
3246+
pass # Threads already launched — continue with current count
3247+
except ImportError:
32383248
pass
32393249

32403250

@@ -3268,7 +3278,12 @@ def _worker_v5_shared(
32683278
from the parent — zero pickle overhead for big arrays.
32693279
Only (start, end) integers + small config args are pickled.
32703280
"""
3271-
_worker_init()
3281+
# Suppress Numba threading conflicts in forked workers
3282+
try:
3283+
import numba
3284+
numba.set_num_threads(1)
3285+
except Exception:
3286+
pass
32723287
try:
32733288
row_indices = _shared_order[start:end]
32743289
gb_unit = _shared_gb_arrays[row_indices]
@@ -3339,7 +3354,12 @@ def _worker_v5(
33393354
Reconstructs a minimal DataFrame for _flatten_bins_for_v5, then calls
33403355
V5 arrays path directly. Returns (unit_id, result_df) or (unit_id, error_str).
33413356
"""
3342-
_worker_init()
3357+
# Suppress Numba threading conflicts in forked workers
3358+
try:
3359+
import numba
3360+
numba.set_num_threads(1)
3361+
except Exception:
3362+
pass
33433363
try:
33443364
# Reconstruct minimal DataFrame from arrays
33453365
data = {}
@@ -3589,6 +3609,13 @@ def make_sliding_window_fit_parallel(
35893609
_shared_y_array = y_array
35903610
_shared_order = order
35913611

3612+
# Prevent Numba thread conflicts in child processes:
3613+
# Save parent env, set threads=1 BEFORE fork, restore after.
3614+
_orig_numba_threads = os.environ.get('NUMBA_NUM_THREADS')
3615+
_orig_numba_layer = os.environ.get('NUMBA_THREADING_LAYER')
3616+
os.environ['NUMBA_NUM_THREADS'] = '1'
3617+
os.environ['NUMBA_THREADING_LAYER'] = 'workqueue'
3618+
35923619
try:
35933620
futures = {}
35943621
with ProcessPoolExecutor(max_workers=n_workers) as executor:
@@ -3631,6 +3658,15 @@ def make_sliding_window_fit_parallel(
36313658
_shared_x_array = None
36323659
_shared_y_array = None
36333660
_shared_order = None
3661+
# Restore parent's Numba env
3662+
if _orig_numba_threads is not None:
3663+
os.environ['NUMBA_NUM_THREADS'] = _orig_numba_threads
3664+
elif 'NUMBA_NUM_THREADS' in os.environ:
3665+
del os.environ['NUMBA_NUM_THREADS']
3666+
if _orig_numba_layer is not None:
3667+
os.environ['NUMBA_THREADING_LAYER'] = _orig_numba_layer
3668+
elif 'NUMBA_THREADING_LAYER' in os.environ:
3669+
del os.environ['NUMBA_THREADING_LAYER']
36343670

36353671
t1_workers = time.time()
36363672

0 commit comments

Comments
 (0)