Skip to content

Commit 4cd5674

Browse files
feat: add support for generating JSON formatted substrait plan (#1376)
* chore: add new dependencies to pyproject.toml * chore: rename from_json to parse_json * fix: missin import * Fix call to internal function. Drive by update to dquality on logical plan. Switch unit test to focus on json parsing and not byte serialization. * fix: resolve clippy redundant closure lint in substrait.rs --------- Co-authored-by: Tim Saucer <timsaucer@gmail.com>
1 parent 024c86b commit 4cd5674

File tree

7 files changed

+118
-4
lines changed

7 files changed

+118
-4
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ datafusion-substrait = { version = "52", optional = true }
6262
datafusion-proto = { version = "52" }
6363
datafusion-ffi = { version = "52" }
6464
prost = "0.14.1" # keep in line with `datafusion-substrait`
65+
serde_json = "1"
6566
uuid = { version = "1.18", features = ["v4"] }
6667
mimalloc = { version = "0.1", optional = true, default-features = false, features = [
6768
"local_dynamic_tls",

python/datafusion/plan.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ def to_proto(self) -> bytes:
9898
"""
9999
return self._raw_plan.to_proto()
100100

101+
def __eq__(self, other: LogicalPlan) -> bool:
102+
"""Test equality."""
103+
if not isinstance(other, LogicalPlan):
104+
return False
105+
return self._raw_plan.__eq__(other._raw_plan)
106+
101107

102108
class ExecutionPlan:
103109
"""Represent nodes in the DataFusion Physical Plan."""

python/datafusion/substrait.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,26 @@ def encode(self) -> bytes:
6767
"""
6868
return self.plan_internal.encode()
6969

70+
def to_json(self) -> str:
71+
"""Get the JSON representation of the Substrait plan.
72+
73+
Returns:
74+
A JSON representation of the Substrait plan.
75+
"""
76+
return self.plan_internal.to_json()
77+
78+
@staticmethod
79+
def from_json(json: str) -> Plan:
80+
"""Parse a plan from a JSON string representation.
81+
82+
Args:
83+
json: JSON representation of a Substrait plan.
84+
85+
Returns:
86+
Plan object representing the Substrait plan.
87+
"""
88+
return Plan(substrait_internal.Plan.from_json(json))
89+
7090

7191
@deprecated("Use `Plan` instead.")
7292
class plan(Plan): # noqa: N801

python/tests/test_substrait.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,76 @@ def test_substrait_file_serialization(ctx, tmp_path, path_to_str):
7474
expected_actual_plan = ss.Consumer.from_substrait_plan(ctx, actual_plan)
7575

7676
assert str(expected_logical_plan) == str(expected_actual_plan)
77+
78+
79+
def test_json_processing_round_trip(ctx: SessionContext):
80+
ctx.register_record_batches("t", [[pa.record_batch({"a": [1]})]])
81+
original_logical_plan = ctx.sql("SELECT * FROM t").logical_plan()
82+
83+
substrait_plan = ss.Producer.to_substrait_plan(original_logical_plan, ctx)
84+
json_plan = substrait_plan.to_json()
85+
86+
expected = """\
87+
"relations": [
88+
{
89+
"root": {
90+
"input": {
91+
"project": {
92+
"common": {
93+
"emit": {
94+
"outputMapping": [
95+
1
96+
]
97+
}
98+
},
99+
"input": {
100+
"read": {
101+
"baseSchema": {
102+
"names": [
103+
"a"
104+
],
105+
"struct": {
106+
"types": [
107+
{
108+
"i64": {
109+
"nullability": "NULLABILITY_NULLABLE"
110+
}
111+
}
112+
],
113+
"nullability": "NULLABILITY_REQUIRED"
114+
}
115+
},
116+
"namedTable": {
117+
"names": [
118+
"t"
119+
]
120+
}
121+
}
122+
},
123+
"expressions": [
124+
{
125+
"selection": {
126+
"directReference": {
127+
"structField": {}
128+
},
129+
"rootReference": {}
130+
}
131+
}
132+
]
133+
}
134+
},
135+
"names": [
136+
"a"
137+
]
138+
}
139+
}
140+
]"""
141+
142+
assert expected in json_plan
143+
144+
round_trip_substrait_plan = ss.Plan.from_json(json_plan)
145+
round_trip_logical_plan = ss.Consumer.from_substrait_plan(
146+
ctx, round_trip_substrait_plan
147+
)
148+
149+
assert round_trip_logical_plan == original_logical_plan

src/sql/logical.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ use crate::expr::unnest::PyUnnest;
6666
use crate::expr::values::PyValues;
6767
use crate::expr::window::PyWindowExpr;
6868

69-
#[pyclass(frozen, name = "LogicalPlan", module = "datafusion", subclass)]
70-
#[derive(Debug, Clone)]
69+
#[pyclass(frozen, name = "LogicalPlan", module = "datafusion", subclass, eq)]
70+
#[derive(Debug, Clone, PartialEq, Eq)]
7171
pub struct PyLogicalPlan {
7272
pub(crate) plan: Arc<LogicalPlan>,
7373
}

src/substrait.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use pyo3::prelude::*;
2323
use pyo3::types::PyBytes;
2424

2525
use crate::context::PySessionContext;
26-
use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err};
26+
use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err, to_datafusion_err};
2727
use crate::sql::logical::PyLogicalPlan;
2828
use crate::utils::wait_for_future;
2929

@@ -42,6 +42,19 @@ impl PyPlan {
4242
.map_err(PyDataFusionError::EncodeError)?;
4343
Ok(PyBytes::new(py, &proto_bytes).into())
4444
}
45+
46+
/// Get the JSON representation of the substrait plan
47+
fn to_json(&self) -> PyDataFusionResult<String> {
48+
let json = serde_json::to_string_pretty(&self.plan).map_err(to_datafusion_err)?;
49+
Ok(json)
50+
}
51+
52+
/// Parse a Substrait Plan from its JSON representation
53+
#[staticmethod]
54+
fn from_json(json: &str) -> PyDataFusionResult<PyPlan> {
55+
let plan: Plan = serde_json::from_str(json).map_err(to_datafusion_err)?;
56+
Ok(PyPlan { plan })
57+
}
4558
}
4659

4760
impl From<PyPlan> for Plan {

0 commit comments

Comments
 (0)