Add expression-based dask operations for array query planning#481
Add expression-based dask operations for array query planning#481dcherian wants to merge 1 commit intoxarray-contrib:mainfrom
Conversation
Adds flox/expr.py with expression-system compatible implementations: - dask_groupby_agg: map-reduce, cohorts, blockwise methods - dask_groupby_scan: cumsum, ffill, bfill with Blelloch algorithm - Expression classes: ExtractFromDictExpr, CollapseBlocksExpr, SubsetBlocksExpr Uses Task/TaskRef/Alias from dask._task_spec for proper task specification. Dispatches based on dask.array.ARRAY_EXPR_ENABLED flag. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| groups_.append(cohort_index.values) | ||
|
|
||
| # Concatenate cohort results along the last axis | ||
| reduced = dask.array.concatenate(cohort_results, axis=-1) |
There was a problem hiding this comment.
the other version of this code does an implicit concat in the _tree_reduce. cohort_results can be O(10_000) arrays long (#415). Should we use that optimization here?
that PR also optimizes _tree_reduce for very large arrays; perhaps Claude can upstream that to dask too ;)
| # If reverse_result is True, we handle it specially to avoid expression | ||
| # optimization issues (slicing scan results triggers problematic | ||
| # optimizations that push slices into cumreduction). |
There was a problem hiding this comment.
seems like a bug to fix?
|
One thing we could chat about is how to handle rechunking heuristics. for example, with resampling problems, a tiny amount of rechunking can be very beneficial to avoiding communication. In https://github.com/xarray-contrib/flox/pull/380/changes, i do "local reasoning" with some heuristics. Is there a way to do global reasoning here? For example flox could always set a "blockwise groupby" layer for resampling problems, and dask could query it for a menu of possible chunk schemes that work well for that layer. Perhaps that's too hard. Here's an example of why I want something like this: https://discourse.pangeo.io/t/best-practice-advice-on-parallel-processing-a-suite-of-zarr-files-with-dask-and-xarray/5201/6?u=dcherian Another example: https://discourse.pangeo.io/t/xarray-memoryerror-with-groupby-workloads/4273/16?u=dcherian |
Adds flox/expr.py with expression-system compatible implementations:
Uses Task/TaskRef/Alias from dask._task_spec for proper task specification. Dispatches based on dask.array.ARRAY_EXPR_ENABLED flag.
🤖 Generated with Claude Code