Skip to content

Commit 4a51ee3

Browse files
timsaucerclaude
andcommitted
Add DataFrame.window() and unnest recursion options
Expose remaining DataFrame methods from upstream DataFusion. Closes #1456. - window(*exprs): apply window function expressions and append results as new columns - unnest_column/unnest_columns: add optional recursions parameter for controlling unnest depth via (input_column, output_column, depth) tuples Note: drop_columns is already exposed as the existing drop() method. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8496662 commit 4a51ee3

File tree

3 files changed

+100
-13
lines changed

3 files changed

+100
-13
lines changed

crates/core/src/dataframe.rs

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,14 @@ impl PyDataFrame {
582582
Ok(Self::new(df))
583583
}
584584

585+
/// Apply window function expressions to the DataFrame
586+
#[pyo3(signature = (*exprs))]
587+
fn window(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
588+
let window_exprs = exprs.into_iter().map(|e| e.into()).collect();
589+
let df = self.df.as_ref().clone().window(window_exprs)?;
590+
Ok(Self::new(df))
591+
}
592+
585593
fn filter(&self, predicate: PyExpr) -> PyDataFusionResult<Self> {
586594
let df = self.df.as_ref().clone().filter(predicate.into())?;
587595
Ok(Self::new(df))
@@ -891,11 +899,14 @@ impl PyDataFrame {
891899
Ok(Self::new(new_df))
892900
}
893901

894-
#[pyo3(signature = (column, preserve_nulls=true))]
895-
fn unnest_column(&self, column: &str, preserve_nulls: bool) -> PyDataFusionResult<Self> {
896-
// TODO: expose RecursionUnnestOptions
897-
// REF: https://github.com/apache/datafusion/pull/11577
898-
let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
902+
#[pyo3(signature = (column, preserve_nulls=true, recursions=None))]
903+
fn unnest_column(
904+
&self,
905+
column: &str,
906+
preserve_nulls: bool,
907+
recursions: Option<Vec<(String, String, usize)>>,
908+
) -> PyDataFusionResult<Self> {
909+
let unnest_options = build_unnest_options(preserve_nulls, recursions);
899910
let df = self
900911
.df
901912
.as_ref()
@@ -904,15 +915,14 @@ impl PyDataFrame {
904915
Ok(Self::new(df))
905916
}
906917

907-
#[pyo3(signature = (columns, preserve_nulls=true))]
918+
#[pyo3(signature = (columns, preserve_nulls=true, recursions=None))]
908919
fn unnest_columns(
909920
&self,
910921
columns: Vec<String>,
911922
preserve_nulls: bool,
923+
recursions: Option<Vec<(String, String, usize)>>,
912924
) -> PyDataFusionResult<Self> {
913-
// TODO: expose RecursionUnnestOptions
914-
// REF: https://github.com/apache/datafusion/pull/11577
915-
let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
925+
let unnest_options = build_unnest_options(preserve_nulls, recursions);
916926
let cols = columns.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
917927
let df = self
918928
.df
@@ -1376,6 +1386,26 @@ impl PyDataFrameWriteOptions {
13761386
}
13771387
}
13781388

1389+
fn build_unnest_options(
1390+
preserve_nulls: bool,
1391+
recursions: Option<Vec<(String, String, usize)>>,
1392+
) -> UnnestOptions {
1393+
let mut opts = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
1394+
if let Some(recs) = recursions {
1395+
opts.recursions = recs
1396+
.into_iter()
1397+
.map(
1398+
|(input, output, depth)| datafusion::common::RecursionUnnestOption {
1399+
input_column: datafusion::common::Column::from(input.as_str()),
1400+
output_column: datafusion::common::Column::from(output.as_str()),
1401+
depth,
1402+
},
1403+
)
1404+
.collect();
1405+
}
1406+
opts
1407+
}
1408+
13791409
/// Print DataFrame
13801410
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
13811411
// Get string representation of record batches

python/datafusion/dataframe.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,21 @@ def drop(self, *columns: str) -> DataFrame:
487487
"""
488488
return DataFrame(self.df.drop(*columns))
489489

490+
def window(self, *exprs: Expr) -> DataFrame:
491+
"""Add window function columns to the DataFrame.
492+
493+
Applies the given window function expressions and appends the results
494+
as new columns.
495+
496+
Args:
497+
exprs: Window function expressions to evaluate.
498+
499+
Returns:
500+
DataFrame with new window function columns appended.
501+
"""
502+
raw = expr_list_to_raw_expr_list(exprs)
503+
return DataFrame(self.df.window(*raw))
504+
490505
def filter(self, *predicates: Expr | str) -> DataFrame:
491506
"""Return a DataFrame for which ``predicate`` evaluates to ``True``.
492507
@@ -1426,23 +1441,44 @@ def count(self) -> int:
14261441
return self.df.count()
14271442

14281443
@deprecated("Use :py:func:`unnest_columns` instead.")
1429-
def unnest_column(self, column: str, preserve_nulls: bool = True) -> DataFrame:
1444+
def unnest_column(
1445+
self,
1446+
column: str,
1447+
preserve_nulls: bool = True,
1448+
recursions: list[tuple[str, str, int]] | None = None,
1449+
) -> DataFrame:
14301450
"""See :py:func:`unnest_columns`."""
1431-
return DataFrame(self.df.unnest_column(column, preserve_nulls=preserve_nulls))
1451+
return DataFrame(
1452+
self.df.unnest_column(
1453+
column, preserve_nulls=preserve_nulls, recursions=recursions
1454+
)
1455+
)
14321456

1433-
def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFrame:
1457+
def unnest_columns(
1458+
self,
1459+
*columns: str,
1460+
preserve_nulls: bool = True,
1461+
recursions: list[tuple[str, str, int]] | None = None,
1462+
) -> DataFrame:
14341463
"""Expand columns of arrays into a single row per array element.
14351464
14361465
Args:
14371466
columns: Column names to perform unnest operation on.
14381467
preserve_nulls: If False, rows with null entries will not be
14391468
returned.
1469+
recursions: Optional list of ``(input_column, output_column, depth)``
1470+
tuples that control how deeply nested columns are unnested. Any
1471+
column not mentioned here is unnested with depth 1.
14401472
14411473
Returns:
14421474
A DataFrame with the columns expanded.
14431475
"""
14441476
columns = list(columns)
1445-
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
1477+
return DataFrame(
1478+
self.df.unnest_columns(
1479+
columns, preserve_nulls=preserve_nulls, recursions=recursions
1480+
)
1481+
)
14461482

14471483
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
14481484
"""Export the DataFrame as an Arrow C Stream.

python/tests/test_dataframe.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3654,3 +3654,24 @@ def test_explain_with_format(capsys):
36543654
df.explain(verbose=True, analyze=True, format=ExplainFormat.INDENT)
36553655
captured = capsys.readouterr()
36563656
assert "plan_type" in captured.out
3657+
3658+
3659+
def test_window():
3660+
ctx = SessionContext()
3661+
df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "x", "y"]})
3662+
result = df.window(
3663+
f.row_number(partition_by=[column("b")], order_by=[column("a")]).alias("rn")
3664+
).collect()[0]
3665+
assert "rn" in result.schema.names
3666+
assert result.column(result.schema.get_field_index("rn")).to_pylist() == [1, 2, 1]
3667+
3668+
3669+
def test_unnest_columns_with_recursions():
3670+
ctx = SessionContext()
3671+
df = ctx.from_pydict({"a": [[1, 2], [3]], "b": ["x", "y"]})
3672+
# Basic unnest still works
3673+
result = df.unnest_columns("a").collect()[0]
3674+
assert result.column(0).to_pylist() == [1, 2, 3]
3675+
# With explicit recursion options
3676+
result = df.unnest_columns("a", recursions=[("a", "a", 1)]).collect()[0]
3677+
assert result.column(0).to_pylist() == [1, 2, 3]

0 commit comments

Comments
 (0)