From 8bf79e5597e27fa86c828fbb3eef2df004bb1d42 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 18 Mar 2026 22:42:43 +0800 Subject: [PATCH] linearized operands in physical binaryexpr to avoid recursion limit --- datafusion/proto/proto/datafusion.proto | 3 + datafusion/proto/src/generated/pbjson.rs | 17 ++ datafusion/proto/src/generated/prost.rs | 4 + .../proto/src/physical_plan/from_proto.rs | 72 +++-- .../proto/src/physical_plan/to_proto.rs | 37 ++- .../tests/cases/roundtrip_physical_plan.rs | 287 ++++++++++++++++++ 6 files changed, 394 insertions(+), 26 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e422ce7bed4f..6e3379ce8f28 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -981,6 +981,9 @@ message PhysicalBinaryExprNode { PhysicalExprNode l = 1; PhysicalExprNode r = 2; string op = 3; + // Linearized operands for chains of the same operator (e.g. a AND b AND c). + // When present, `l` and `r` are ignored and `operands` holds the flattened list. + repeated PhysicalExprNode operands = 4; } message PhysicalDateTimeIntervalExprNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index eb86afe3d6e0..7fe9e4073e81 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -15899,6 +15899,9 @@ impl serde::Serialize for PhysicalBinaryExprNode { if !self.op.is_empty() { len += 1; } + if !self.operands.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalBinaryExprNode", len)?; if let Some(v) = self.l.as_ref() { struct_ser.serialize_field("l", v)?; @@ -15909,6 +15912,9 @@ impl serde::Serialize for PhysicalBinaryExprNode { if !self.op.is_empty() { struct_ser.serialize_field("op", &self.op)?; } + if !self.operands.is_empty() { + struct_ser.serialize_field("operands", &self.operands)?; + } struct_ser.end() } } @@ -15922,6 +15928,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalBinaryExprNode { "l", "r", "op", + "operands", ]; #[allow(clippy::enum_variant_names)] @@ -15929,6 +15936,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalBinaryExprNode { L, R, Op, + Operands, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -15953,6 +15961,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalBinaryExprNode { "l" => Ok(GeneratedField::L), "r" => Ok(GeneratedField::R), "op" => Ok(GeneratedField::Op), + "operands" => Ok(GeneratedField::Operands), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -15975,6 +15984,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalBinaryExprNode { let mut l__ = None; let mut r__ = None; let mut op__ = None; + let mut operands__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::L => { @@ -15995,12 +16005,19 @@ impl<'de> serde::Deserialize<'de> for PhysicalBinaryExprNode { } op__ = Some(map_.next_value()?); } + GeneratedField::Operands => { + if operands__.is_some() { + return Err(serde::de::Error::duplicate_field("operands")); + } + operands__ = Some(map_.next_value()?); + } } } Ok(PhysicalBinaryExprNode { l: l__, r: r__, op: op__.unwrap_or_default(), + operands: operands__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index e0a0c636fbb3..778f7eec140e 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1478,6 +1478,10 @@ pub struct PhysicalBinaryExprNode { pub r: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(string, tag = "3")] pub op: ::prost::alloc::string::String, + /// Linearized operands for chains of the same operator (e.g. a AND b AND c). + /// When present, `l` and `r` are ignored and `operands` holds the flattened list. + #[prost(message, repeated, tag = "4")] + pub operands: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalDateTimeIntervalExprNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index c7a4bd822663..bc2770fe97e3 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -273,25 +273,59 @@ pub fn parse_physical_expr_with_converter( } ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)), ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)), - ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new( - parse_required_physical_expr( - binary_expr.l.as_deref(), - ctx, - "left", - input_schema, - codec, - proto_converter, - )?, - logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?, - parse_required_physical_expr( - binary_expr.r.as_deref(), - ctx, - "right", - input_schema, - codec, - proto_converter, - )?, - )), + ExprType::BinaryExpr(binary_expr) => { + let op = logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?; + if !binary_expr.operands.is_empty() { + // New linearized format: reduce the flat operands list back into + // a nested binary expression tree. + let operands: Vec> = binary_expr + .operands + .iter() + .map(|e| { + proto_converter.proto_to_physical_expr( + e, + ctx, + input_schema, + codec, + ) + }) + .collect::>>()?; + + if operands.len() < 2 { + return Err(proto_error( + "A binary expression must always have at least 2 operands", + )); + } + + operands + .into_iter() + .reduce(|left, right| Arc::new(BinaryExpr::new(left, op, right))) + .expect( + "Binary expression could not be reduced to a single expression.", + ) + } else { + // Legacy format with l/r fields + Arc::new(BinaryExpr::new( + parse_required_physical_expr( + binary_expr.l.as_deref(), + ctx, + "left", + input_schema, + codec, + proto_converter, + )?, + op, + parse_required_physical_expr( + binary_expr.r.as_deref(), + ctx, + "right", + input_schema, + codec, + proto_converter, + )?, + )) + } + } ExprType::AggregateExpr(_) => { return not_impl_err!( "Cannot convert aggregate expr node to physical expression" diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 990a54cf94c7..311e4c45ea6d 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -307,14 +307,37 @@ pub fn serialize_physical_expr_with_converter( )), }) } else if let Some(expr) = expr.downcast_ref::() { + // Linearize a nested binary expression tree of the same operator + // into a flat vector of operands to avoid deep recursion in proto. + let op = expr.op(); + let mut operand_refs: Vec<&Arc> = vec![expr.right()]; + let mut current_expr: &BinaryExpr = expr; + loop { + match current_expr.left().as_any().downcast_ref::() { + Some(bin) if bin.op() == op => { + operand_refs.push(bin.right()); + current_expr = bin; + } + _ => { + operand_refs.push(current_expr.left()); + break; + } + } + } + + // Reverse so operands are ordered from left innermost to right outermost + operand_refs.reverse(); + + let operands = operand_refs + .iter() + .map(|e| proto_converter.physical_expr_to_proto(e, codec)) + .collect::>>()?; + let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode { - l: Some(Box::new( - proto_converter.physical_expr_to_proto(expr.left(), codec)?, - )), - r: Some(Box::new( - proto_converter.physical_expr_to_proto(expr.right(), codec)?, - )), - op: format!("{:?}", expr.op()), + l: None, + r: None, + op: format!("{:?}", op), + operands, }); Ok(protobuf::PhysicalExprNode { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 0a5ed766e6cc..1e086281bef4 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3196,3 +3196,290 @@ fn roundtrip_lead_with_default_value() -> Result<()> { true, )?)) } + +/// Test that a chain of the same operator (a AND b AND c) is linearized +/// and roundtrips correctly. +#[test] +fn roundtrip_binary_expr_chain_same_op() -> Result<()> { + let field_a = Field::new("a", DataType::Boolean, false); + let field_b = Field::new("b", DataType::Boolean, false); + let field_c = Field::new("c", DataType::Boolean, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b, field_c])); + let ab = binary( + col("a", &schema)?, + Operator::And, + col("b", &schema)?, + &schema, + )?; + let abc = binary(ab, Operator::And, col("c", &schema)?, &schema)?; + roundtrip_test(Arc::new(FilterExec::try_new( + abc, + Arc::new(EmptyExec::new(schema)), + )?)) +} + +/// Test that mixed operators (a AND b OR c) are NOT linearized together — +/// only chains of the same operator are flattened. +#[test] +fn roundtrip_binary_expr_mixed_ops() -> Result<()> { + let field_a = Field::new("a", DataType::Boolean, false); + let field_b = Field::new("b", DataType::Boolean, false); + let field_c = Field::new("c", DataType::Boolean, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b, field_c])); + // (a AND b) OR c — AND and OR are different operators, so linearization stops + let a_and_b = binary( + col("a", &schema)?, + Operator::And, + col("b", &schema)?, + &schema, + )?; + let expr = binary(a_and_b, Operator::Or, col("c", &schema)?, &schema)?; + roundtrip_test(Arc::new(FilterExec::try_new( + expr, + Arc::new(EmptyExec::new(schema)), + )?)) +} + +/// Test that a deeply nested chain of AND expressions (like many WHERE conditions) +/// roundtrips correctly. This is the scenario from issue #18602. +#[test] +fn roundtrip_binary_expr_deeply_nested_and_chain() -> Result<()> { + let field_a = Field::new("a", DataType::Boolean, false); + let schema = Arc::new(Schema::new(vec![field_a])); + + // Build a chain: a AND a AND a AND ... (100 times) + let col_a = col("a", &schema)?; + let mut expr = Arc::clone(&col_a); + for _ in 0..99 { + expr = binary(expr, Operator::And, Arc::clone(&col_a), &schema)?; + } + + roundtrip_test(Arc::new(FilterExec::try_new( + expr, + Arc::new(EmptyExec::new(schema)), + )?)) +} + +/// Test that a deeply nested chain of OR expressions roundtrips correctly. +#[test] +fn roundtrip_binary_expr_deeply_nested_or_chain() -> Result<()> { + let field_a = Field::new("a", DataType::Boolean, false); + let schema = Arc::new(Schema::new(vec![field_a])); + + let col_a = col("a", &schema)?; + let mut expr = Arc::clone(&col_a); + for _ in 0..99 { + expr = binary(expr, Operator::Or, Arc::clone(&col_a), &schema)?; + } + + roundtrip_test(Arc::new(FilterExec::try_new( + expr, + Arc::new(EmptyExec::new(schema)), + )?)) +} + +/// Test that alternating AND/OR operators produce correct results — +/// each sub-chain gets linearized independently. +#[test] +fn roundtrip_binary_expr_alternating_and_or() -> Result<()> { + let field_a = Field::new("a", DataType::Boolean, false); + let field_b = Field::new("b", DataType::Boolean, false); + let field_c = Field::new("c", DataType::Boolean, false); + let field_d = Field::new("d", DataType::Boolean, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b, field_c, field_d])); + + // (a AND b) OR (c AND d) + let a_and_b = binary( + col("a", &schema)?, + Operator::And, + col("b", &schema)?, + &schema, + )?; + let c_and_d = binary( + col("c", &schema)?, + Operator::And, + col("d", &schema)?, + &schema, + )?; + let expr = binary(a_and_b, Operator::Or, c_and_d, &schema)?; + + roundtrip_test(Arc::new(FilterExec::try_new( + expr, + Arc::new(EmptyExec::new(schema)), + )?)) +} + +/// Verify that the linearized proto format has a flat operands list +/// rather than deeply nested l/r fields. +#[test] +fn test_linearization_produces_flat_operands() -> Result<()> { + // Build: a AND a AND a AND a (4 operands, 3 levels of nesting) + let col_a: Arc = Arc::new(Column::new("a", 0)); + let expr: Arc = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::And, + Arc::clone(&col_a), + )), + Operator::And, + Arc::clone(&col_a), + )), + Operator::And, + Arc::clone(&col_a), + )); + + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let proto = proto_converter.physical_expr_to_proto(&expr, &codec)?; + + // The top-level should use the operands field with 4 entries + match &proto.expr_type { + Some(protobuf::physical_expr_node::ExprType::BinaryExpr(b)) => { + assert!( + b.l.is_none(), + "l should be None when using linearized operands" + ); + assert!( + b.r.is_none(), + "r should be None when using linearized operands" + ); + assert_eq!( + b.operands.len(), + 4, + "Expected 4 linearized operands for a AND a AND a AND a" + ); + assert_eq!(b.op, "And"); + } + other => panic!("Expected BinaryExpr, got {other:?}"), + } + + Ok(()) +} + +/// Test that linearization stops when encountering a different operator. +/// For (a AND b) OR c, only the top-level OR should be represented, and +/// the left-hand AND subtree should be a separate nested BinaryExpr. +#[test] +fn test_linearization_stops_at_different_op() -> Result<()> { + // (a AND b) OR c + let a_and_b: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::And, + Arc::new(Column::new("b", 1)), + )); + let expr: Arc = Arc::new(BinaryExpr::new( + a_and_b, + Operator::Or, + Arc::new(Column::new("c", 2)), + )); + + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let proto = proto_converter.physical_expr_to_proto(&expr, &codec)?; + + // The top-level OR should have only 2 operands (can't linearize through AND) + match &proto.expr_type { + Some(protobuf::physical_expr_node::ExprType::BinaryExpr(b)) => { + assert_eq!( + b.operands.len(), + 2, + "Expected 2 operands for (a AND b) OR c" + ); + assert_eq!(b.op, "Or"); + // The first operand should be a nested AND BinaryExpr + match &b.operands[0].expr_type { + Some(protobuf::physical_expr_node::ExprType::BinaryExpr(inner)) => { + assert_eq!(inner.op, "And"); + assert_eq!(inner.operands.len(), 2); + } + other => panic!("Expected inner BinaryExpr(AND), got {other:?}"), + } + } + other => panic!("Expected BinaryExpr, got {other:?}"), + } + + Ok(()) +} + +/// returns a SessionContext with an empty `netflow` table registered +fn netflow_context() -> Result { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("dst_geo_country_name", DataType::Utf8, true), + Field::new("dst_geo_city_name", DataType::Utf8, true), + Field::new("packets", DataType::UInt64, true), + Field::new("src_addr", DataType::Utf8, true), + Field::new("dst_addr", DataType::Utf8, true), + ])); + + ctx.register_table("netflow", Arc::new(EmptyTable::new(schema)))?; + + Ok(ctx) +} + +/// Regression test for issue #18602: +/// https://github.com/apache/datafusion/issues/18602 +/// +/// The physical filter expression here contains a long chain of `AND` predicates. +/// Before linearizing `PhysicalBinaryExprNode`, encoding then decoding the protobuf +/// could fail with `DecodeError: recursion limit reached`. +#[tokio::test] +async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> { + let ctx = netflow_context()?; + let sql = "SELECT \ + dst_geo_country_name AS x_axis_1, \ + dst_geo_city_name AS x_axis_2, \ + sum(packets) AS y_axis_1 \ + FROM netflow \ + WHERE dst_geo_country_name IS NOT NULL \ + AND src_addr NOT LIKE '10.201.%' \ + AND dst_addr NOT LIKE '10.201.%' \ + AND src_addr NOT LIKE '10.202.%' \ + AND dst_addr NOT LIKE '10.202.%' \ + AND src_addr NOT LIKE '10.203.%' \ + AND dst_addr NOT LIKE '10.203.%' \ + AND src_addr NOT LIKE '10.204.%' \ + AND dst_addr NOT LIKE '10.204.%' \ + AND src_addr NOT LIKE '172.16.186.%' \ + AND dst_addr NOT LIKE '172.16.186.%' \ + AND src_addr NOT LIKE '172.16.187.%' \ + AND dst_addr NOT LIKE '172.16.187.%' \ + AND src_addr NOT LIKE '172.16.188.%' \ + AND dst_addr NOT LIKE '172.16.188.%' \ + AND src_addr NOT LIKE '10.102.45.%' \ + AND dst_addr NOT LIKE '10.102.45.%' \ + AND src_addr NOT LIKE '172.25.210.%' \ + AND dst_addr NOT LIKE '172.25.210.%' \ + AND src_addr NOT LIKE '172.25.211.%' \ + AND dst_addr NOT LIKE '172.25.211.%' \ + AND src_addr NOT LIKE '141.226.101.%' \ + AND dst_addr NOT LIKE '141.226.101.%' \ + AND src_addr NOT LIKE '167.86.40.%' \ + AND dst_addr NOT LIKE '167.86.40.%' \ + AND src_addr NOT LIKE '66.22.38.%' \ + AND dst_addr NOT LIKE '66.22.38.%' \ + AND src_addr != '168.143.191.55' \ + AND dst_addr != '168.143.191.55' \ + AND src_addr != '82.112.107.142' \ + AND dst_addr != '82.112.107.142' \ + AND src_addr != '20.76.39.176' \ + AND dst_addr != '20.76.39.176' \ + AND src_addr != '162.159.129.83' \ + AND dst_addr != '162.159.129.83' \ + AND src_addr != '34.201.223.155' \ + AND dst_addr != '34.201.223.155' \ + AND src_addr != '34.201.223.156' \ + AND dst_addr != '34.201.223.156' \ + AND src_addr != '34.201.223.157' \ + AND dst_addr != '34.201.223.157' \ + AND src_addr != '134.201.223.157' \ + AND dst_addr != '134.201.223.157' \ + AND src_addr != '341.201.223.157' \ + AND dst_addr != '341.201.223.157' \ + GROUP BY x_axis_1, x_axis_2 \ + ORDER BY y_axis_1 DESC \ + LIMIT 20"; + + roundtrip_test_sql_with_context(sql, &ctx).await +}