Skip to content

Commit 3443110

Browse files
committed
Upgrade to DF 48
1 parent 0cc9b0a commit 3443110

File tree

8 files changed

+159
-161
lines changed

8 files changed

+159
-161
lines changed

Cargo.lock

Lines changed: 100 additions & 117 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ substrait = ["dep:datafusion-substrait"]
3737
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
3838
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
3939
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
40-
arrow = { version = "55.0.0", features = ["pyarrow"] }
41-
datafusion = { version = "47.0.0", features = ["avro", "unicode_expressions"] }
42-
datafusion-substrait = { version = "47.0.0", optional = true }
43-
datafusion-proto = { version = "47.0.0" }
44-
datafusion-ffi = { version = "47.0.0" }
40+
arrow = { version = "55.1.0", features = ["pyarrow"] }
41+
datafusion = { version = "48.0.0", features = ["avro", "unicode_expressions"] }
42+
datafusion-substrait = { version = "48.0.0", optional = true }
43+
datafusion-proto = { version = "48.0.0" }
44+
datafusion-ffi = { version = "48.0.0" }
4545
prost = "0.13.1" # keep in line with `datafusion-substrait`
4646
uuid = { version = "1.16", features = ["v4"] }
4747
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
@@ -61,3 +61,10 @@ crate-type = ["cdylib", "rlib"]
6161
[profile.release]
6262
lto = true
6363
codegen-units = 1
64+
65+
# TODO remove before merging to main
66+
[patch.crates-io]
67+
datafusion = { git = "https://github.com/apache/datafusion", rev = "33a32d4382bee7e3c705d0f55d05c24a115a2f98" }
68+
datafusion-substrait = { git = "https://github.com/apache/datafusion", rev = "33a32d4382bee7e3c705d0f55d05c24a115a2f98" }
69+
datafusion-proto = { git = "https://github.com/apache/datafusion", rev = "33a32d4382bee7e3c705d0f55d05c24a115a2f98" }
70+
datafusion-ffi = { git = "https://github.com/apache/datafusion", rev = "33a32d4382bee7e3c705d0f55d05c24a115a2f98" }

src/expr.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use pyo3::IntoPyObjectExt;
2424
use pyo3::{basic::CompareOp, prelude::*};
2525
use std::collections::HashMap;
2626
use std::convert::{From, Into};
27+
use std::env::args;
2728
use std::sync::Arc;
2829
use window::PyWindowFrame;
2930

@@ -150,7 +151,7 @@ impl PyExpr {
150151
Ok(PyScalarVariable::new(data_type, variables).into_bound_py_any(py)?)
151152
}
152153
Expr::Like(value) => Ok(PyLike::from(value.clone()).into_bound_py_any(py)?),
153-
Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_bound_py_any(py)?),
154+
Expr::Literal(value, metadata) => Ok(PyLiteral::new_with_metadata(value.clone(), metadata.clone()).into_bound_py_any(py)?),
154155
Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_bound_py_any(py)?),
155156
Expr::Not(expr) => Ok(PyNot::new(*expr.clone()).into_bound_py_any(py)?),
156157
Expr::IsNotNull(expr) => Ok(PyIsNotNull::new(*expr.clone()).into_bound_py_any(py)?),
@@ -377,7 +378,7 @@ impl PyExpr {
377378
/// Extracts the Expr value into a PyObject that can be shared with Python
378379
pub fn python_value(&self, py: Python) -> PyResult<PyObject> {
379380
match &self.expr {
380-
Expr::Literal(scalar_value) => scalar_to_pyarrow(scalar_value, py),
381+
Expr::Literal(scalar_value, _) => scalar_to_pyarrow(scalar_value, py),
381382
_ => Err(py_type_err(format!(
382383
"Non Expr::Literal encountered in types: {:?}",
383384
&self.expr
@@ -417,11 +418,13 @@ impl PyExpr {
417418
params: AggregateFunctionParams { args, .. },
418419
..
419420
})
420-
| Expr::ScalarFunction(ScalarFunction { args, .. })
421-
| Expr::WindowFunction(WindowFunction {
422-
params: WindowFunctionParams { args, .. },
423-
..
424-
}) => Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect()),
421+
| Expr::ScalarFunction(ScalarFunction { args, .. }) => {
422+
Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
423+
}
424+
Expr::WindowFunction(boxed_window_fn) => {
425+
let args = &boxed_window_fn.params.args;
426+
Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
427+
}
425428

426429
// Expr(s) that require more specific processing
427430
Expr::Case(Case {
@@ -600,10 +603,10 @@ impl PyExpr {
600603
) -> PyDataFusionResult<PyExpr> {
601604
match &self.expr {
602605
Expr::AggregateFunction(agg_fn) => {
603-
let window_fn = Expr::WindowFunction(WindowFunction::new(
606+
let window_fn = Expr::WindowFunction(Box::new(WindowFunction::new(
604607
WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
605608
agg_fn.params.args.clone(),
606-
));
609+
)));
607610

608611
add_builder_fns_to_window(
609612
window_fn,
@@ -743,7 +746,7 @@ impl PyExpr {
743746
| Operator::QuestionPipe => Err(py_type_err(format!("Unsupported expr: ${op}"))),
744747
},
745748
Expr::Cast(Cast { expr: _, data_type }) => DataTypeMap::map_from_arrow_type(data_type),
746-
Expr::Literal(scalar_value) => DataTypeMap::map_from_scalar_value(scalar_value),
749+
Expr::Literal(scalar_value, _) => DataTypeMap::map_from_scalar_value(scalar_value),
747750
_ => Err(py_type_err(format!(
748751
"Non Expr::Literal encountered in types: {:?}",
749752
expr

src/expr/literal.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,23 @@
1818
use crate::errors::PyDataFusionError;
1919
use datafusion::common::ScalarValue;
2020
use pyo3::{prelude::*, IntoPyObjectExt};
21+
use std::collections::{BTreeMap, HashMap};
22+
use std::sync::Arc;
2123

2224
#[pyclass(name = "Literal", module = "datafusion.expr", subclass)]
2325
#[derive(Clone)]
2426
pub struct PyLiteral {
2527
pub value: ScalarValue,
28+
pub metadata: Option<BTreeMap<String, String>>,
29+
}
30+
31+
impl PyLiteral {
32+
pub fn new_with_metadata(
33+
value: ScalarValue,
34+
metadata: Option<BTreeMap<String, String>>,
35+
) -> PyLiteral {
36+
Self { value, metadata }
37+
}
2638
}
2739

2840
impl From<PyLiteral> for ScalarValue {
@@ -33,7 +45,10 @@ impl From<PyLiteral> for ScalarValue {
3345

3446
impl From<ScalarValue> for PyLiteral {
3547
fn from(value: ScalarValue) -> PyLiteral {
36-
PyLiteral { value }
48+
PyLiteral {
49+
value,
50+
metadata: None,
51+
}
3752
}
3853
}
3954

src/expr/window.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -118,51 +118,45 @@ impl PyWindowExpr {
118118
/// Returns order by columns in a window function expression
119119
pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PySortExpr>> {
120120
match expr.expr.unalias() {
121-
Expr::WindowFunction(WindowFunction {
122-
params: WindowFunctionParams { order_by, .. },
123-
..
124-
}) => py_sort_expr_list(&order_by),
121+
Expr::WindowFunction(boxed_window_fn) => {
122+
py_sort_expr_list(&boxed_window_fn.params.order_by)
123+
}
125124
other => Err(not_window_function_err(other)),
126125
}
127126
}
128127

129128
/// Return partition by columns in a window function expression
130129
pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
131130
match expr.expr.unalias() {
132-
Expr::WindowFunction(WindowFunction {
133-
params: WindowFunctionParams { partition_by, .. },
134-
..
135-
}) => py_expr_list(&partition_by),
131+
Expr::WindowFunction(boxed_window_fn) => {
132+
py_expr_list(&boxed_window_fn.params.partition_by)
133+
}
136134
other => Err(not_window_function_err(other)),
137135
}
138136
}
139137

140138
/// Return input args for window function
141139
pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
142140
match expr.expr.unalias() {
143-
Expr::WindowFunction(WindowFunction {
144-
params: WindowFunctionParams { args, .. },
145-
..
146-
}) => py_expr_list(&args),
141+
Expr::WindowFunction(boxed_window_fn) => py_expr_list(&boxed_window_fn.params.args),
147142
other => Err(not_window_function_err(other)),
148143
}
149144
}
150145

151146
/// Return window function name
152147
pub fn window_func_name(&self, expr: PyExpr) -> PyResult<String> {
153148
match expr.expr.unalias() {
154-
Expr::WindowFunction(WindowFunction { fun, .. }) => Ok(fun.to_string()),
149+
Expr::WindowFunction(boxed_window_fn) => Ok(boxed_window_fn.fun.to_string()),
155150
other => Err(not_window_function_err(other)),
156151
}
157152
}
158153

159154
/// Returns a Pywindow frame for a given window function expression
160155
pub fn get_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
161156
match expr.expr.unalias() {
162-
Expr::WindowFunction(WindowFunction {
163-
params: WindowFunctionParams { window_frame, .. },
164-
..
165-
}) => Some(window_frame.into()),
157+
Expr::WindowFunction(boxed_window_fn) => {
158+
Some(boxed_window_fn.params.window_frame.into())
159+
}
166160
_ => None,
167161
}
168162
}

src/functions.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ fn array_cat(exprs: Vec<PyExpr>) -> PyExpr {
103103
#[pyo3(signature = (array, element, index=None))]
104104
fn array_position(array: PyExpr, element: PyExpr, index: Option<i64>) -> PyExpr {
105105
let index = ScalarValue::Int64(index);
106-
let index = Expr::Literal(index);
106+
let index = Expr::Literal(index, None);
107107
datafusion::functions_nested::expr_fn::array_position(array.into(), element.into(), index)
108108
.into()
109109
}
@@ -334,7 +334,7 @@ fn window(
334334
.unwrap_or(WindowFrame::new(order_by.as_ref().map(|v| !v.is_empty())));
335335

336336
Ok(PyExpr {
337-
expr: datafusion::logical_expr::Expr::WindowFunction(WindowFunction {
337+
expr: datafusion::logical_expr::Expr::WindowFunction(Box::new(WindowFunction {
338338
fun,
339339
params: WindowFunctionParams {
340340
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
@@ -351,7 +351,7 @@ fn window(
351351
window_frame,
352352
null_treatment: None,
353353
},
354-
}),
354+
})),
355355
})
356356
}
357357

src/pyarrow_filter_expression.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ fn extract_scalar_list<'py>(
6161
.iter()
6262
.map(|expr| match expr {
6363
// TODO: should we also leverage `ScalarValue::to_pyarrow` here?
64-
Expr::Literal(v) => match v {
64+
Expr::Literal(v, _) => match v {
6565
// The unwraps here are for infallible conversions
6666
ScalarValue::Boolean(Some(b)) => Ok(b.into_bound_py_any(py)?),
6767
ScalarValue::Int8(Some(i)) => Ok(i.into_bound_py_any(py)?),
@@ -106,7 +106,7 @@ impl TryFrom<&Expr> for PyArrowFilterExpression {
106106
let op_module = Python::import(py, "operator")?;
107107
let pc_expr: PyDataFusionResult<Bound<'_, PyAny>> = match expr {
108108
Expr::Column(Column { name, .. }) => Ok(pc.getattr("field")?.call1((name,))?),
109-
Expr::Literal(scalar) => Ok(scalar_to_pyarrow(scalar, py)?.into_bound(py)),
109+
Expr::Literal(scalar, _) => Ok(scalar_to_pyarrow(scalar, py)?.into_bound(py)),
110110
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
111111
let operator = operator_to_py(op, &op_module)?;
112112
let left = PyArrowFilterExpression::try_from(left.as_ref())?.0;

src/udwf.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,9 @@ impl WindowUDFImpl for MultiColumnWindowUDF {
300300
&self.signature
301301
}
302302

303-
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<arrow::datatypes::Field> {
303+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<arrow::datatypes::FieldRef> {
304304
// TODO: Should nullable always be `true`?
305-
Ok(arrow::datatypes::Field::new(
306-
field_args.name(),
307-
self.return_type.clone(),
308-
true,
309-
))
305+
Ok(arrow::datatypes::Field::new(field_args.name(), self.return_type.clone(), true).into())
310306
}
311307

312308
// TODO: Enable passing partition_evaluator_args to python?

0 commit comments

Comments
 (0)