Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,9 @@ message PhysicalBinaryExprNode {
PhysicalExprNode l = 1;
PhysicalExprNode r = 2;
string op = 3;
// Linearized operands for chains of the same operator (e.g. a AND b AND c).
// When present, `l` and `r` are ignored and `operands` holds the flattened list.
repeated PhysicalExprNode operands = 4;
}

message PhysicalDateTimeIntervalExprNode {
Expand Down
17 changes: 17 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 53 additions & 19 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,25 +273,59 @@ pub fn parse_physical_expr_with_converter(
}
ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
parse_required_physical_expr(
binary_expr.l.as_deref(),
ctx,
"left",
input_schema,
codec,
proto_converter,
)?,
logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
parse_required_physical_expr(
binary_expr.r.as_deref(),
ctx,
"right",
input_schema,
codec,
proto_converter,
)?,
)),
ExprType::BinaryExpr(binary_expr) => {
let op = logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?;
if !binary_expr.operands.is_empty() {
// New linearized format: reduce the flat operands list back into
// a nested binary expression tree.
let operands: Vec<Arc<dyn PhysicalExpr>> = binary_expr
.operands
.iter()
.map(|e| {
proto_converter.proto_to_physical_expr(
e,
ctx,
input_schema,
codec,
)
})
.collect::<Result<Vec<_>>>()?;

if operands.len() < 2 {
return Err(proto_error(
"A binary expression must always have at least 2 operands",
));
}

operands
.into_iter()
.reduce(|left, right| Arc::new(BinaryExpr::new(left, op, right)))
.expect(
"Binary expression could not be reduced to a single expression.",
)
} else {
// Legacy format with l/r fields
Arc::new(BinaryExpr::new(
parse_required_physical_expr(
binary_expr.l.as_deref(),
ctx,
"left",
input_schema,
codec,
proto_converter,
)?,
op,
parse_required_physical_expr(
binary_expr.r.as_deref(),
ctx,
"right",
input_schema,
codec,
proto_converter,
)?,
))
}
}
ExprType::AggregateExpr(_) => {
return not_impl_err!(
"Cannot convert aggregate expr node to physical expression"
Expand Down
37 changes: 30 additions & 7 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,37 @@ pub fn serialize_physical_expr_with_converter(
)),
})
} else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
// Linearize a nested binary expression tree of the same operator
// into a flat vector of operands to avoid deep recursion in proto.
let op = expr.op();
let mut operand_refs: Vec<&Arc<dyn PhysicalExpr>> = vec![expr.right()];
let mut current_expr: &BinaryExpr = expr;
loop {
match current_expr.left().as_any().downcast_ref::<BinaryExpr>() {
Some(bin) if bin.op() == op => {
operand_refs.push(bin.right());
current_expr = bin;
}
_ => {
operand_refs.push(current_expr.left());
break;
}
}
}

// Reverse so operands are ordered from left innermost to right outermost
operand_refs.reverse();

let operands = operand_refs
.iter()
.map(|e| proto_converter.physical_expr_to_proto(e, codec))
.collect::<Result<Vec<_>>>()?;

let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
l: Some(Box::new(
proto_converter.physical_expr_to_proto(expr.left(), codec)?,
)),
r: Some(Box::new(
proto_converter.physical_expr_to_proto(expr.right(), codec)?,
)),
op: format!("{:?}", expr.op()),
l: None,
r: None,
op: format!("{:?}", op),
operands,
});

Ok(protobuf::PhysicalExprNode {
Expand Down
Loading
Loading