Skip to content

Commit c752b3d

Browse files
committed
Working through execution plan FFI
1 parent 19e4dde commit c752b3d

File tree

4 files changed

+173
-41
lines changed

4 files changed

+173
-41
lines changed

src/context.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ use datafusion::datasource::listing::{
5959
};
6060
use datafusion::datasource::TableProvider;
6161
use datafusion::datasource::{provider, MemTable};
62-
use datafusion::execution::context::{DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext};
62+
use datafusion::execution::context::{
63+
DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext,
64+
};
6365
use datafusion::execution::disk_manager::DiskManagerConfig;
6466
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool};
6567
use datafusion::execution::options::ReadOptions;

src/ffi/execution_plan.rs

Lines changed: 132 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,84 @@
11
use std::{
22
ffi::{c_void, CString},
3+
ptr::null,
34
sync::Arc,
45
};
56

6-
use datafusion::{physical_expr::{EquivalenceProperties, LexOrdering}, physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties}};
7+
use datafusion::error::Result;
8+
use datafusion::{
9+
error::DataFusionError,
10+
parquet::file::properties,
11+
physical_expr::{EquivalenceProperties, LexOrdering},
12+
physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties},
13+
};
714

815
#[repr(C)]
916
#[derive(Debug)]
1017
#[allow(missing_docs)]
1118
#[allow(non_camel_case_types)]
1219
pub struct FFI_ExecutionPlan {
13-
pub properties: Option<unsafe extern "C" fn(provider: *const FFI_ExecutionPlan) -> FFI_ArrowSchema>,
20+
pub properties:
21+
Option<unsafe extern "C" fn(plan: *const FFI_ExecutionPlan) -> FFI_PlanProperties>,
22+
pub children: Option<
23+
unsafe extern "C" fn(
24+
plan: *const FFI_ExecutionPlan,
25+
num_children: &mut usize,
26+
out: &mut *const FFI_ExecutionPlan,
27+
) -> i32,
28+
>,
1429

1530
pub private_data: *mut c_void,
1631
}
1732

18-
unsafe impl Send for FFI_ExecutionPlan {}
19-
unsafe impl Sync for FFI_ExecutionPlan {}
20-
2133
pub struct ExecutionPlanPrivateData {
2234
pub plan: Arc<dyn ExecutionPlan + Send>,
2335
pub last_error: Option<CString>,
2436
}
2537

26-
struct ExportedExecutionPlan(*const FFI_ExecutionPlan);
38+
unsafe extern "C" fn properties_fn_wrapper(plan: *const FFI_ExecutionPlan) -> FFI_PlanProperties {
39+
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
40+
let properties = (*private_data).plan.properties();
41+
properties.into()
42+
}
2743

28-
impl FFI_ExecutionPlan {
44+
unsafe extern "C" fn children_fn_wrapper(
45+
plan: *const FFI_ExecutionPlan,
46+
num_children: &mut usize,
47+
out: &mut *const FFI_ExecutionPlan,
48+
) -> i32 {
49+
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
50+
51+
let children = (*private_data).plan.children();
52+
*num_children = children.len();
53+
let children: Vec<FFI_ExecutionPlan> = children
54+
.into_iter()
55+
.map(|child| FFI_ExecutionPlan::new(child.clone()))
56+
.collect();
57+
*out = children.as_ptr();
58+
59+
0
60+
}
2961

30-
pub fn empty() -> Self {
31-
Self {
32-
private_data: std::ptr::null_mut(),
33-
}
62+
// Since the trait ExecutionPlan requires borrowed values, we wrap our FFI.
63+
// This struct exists on the consumer side (datafusion-python, for example) and not
64+
// in the provider's side.
65+
#[derive(Debug)]
66+
pub struct ExportedExecutionPlan {
67+
plan: *const FFI_ExecutionPlan,
68+
properties: PlanProperties,
69+
children: Vec<Arc<dyn ExecutionPlan>>,
70+
}
71+
72+
unsafe impl Send for ExportedExecutionPlan {}
73+
unsafe impl Sync for ExportedExecutionPlan {}
74+
75+
impl DisplayAs for ExportedExecutionPlan {
76+
fn fmt_as(
77+
&self,
78+
t: datafusion::physical_plan::DisplayFormatType,
79+
f: &mut std::fmt::Formatter,
80+
) -> std::fmt::Result {
81+
todo!()
3482
}
3583
}
3684

@@ -42,12 +90,63 @@ impl FFI_ExecutionPlan {
4290
});
4391

4492
Self {
45-
private_data: Box::into_raw(private_data) as *mut c_void
93+
properties: Some(properties_fn_wrapper),
94+
children: Some(children_fn_wrapper),
95+
private_data: Box::into_raw(private_data) as *mut c_void,
96+
}
97+
}
98+
99+
pub fn empty() -> Self {
100+
Self {
101+
properties: None,
102+
children: None,
103+
private_data: std::ptr::null_mut(),
46104
}
47105
}
48106
}
49107

50-
impl ExecutionPlan for FFI_ExecutionPlan {
108+
impl ExportedExecutionPlan {
109+
pub fn new(plan: *const FFI_ExecutionPlan) -> Result<Self> {
110+
let properties = unsafe {
111+
let properties_fn = (*plan).properties.ok_or(DataFusionError::NotImplemented(
112+
"properties not implemented on FFI_ExecutionPlan".to_string(),
113+
))?;
114+
properties_fn(plan).into()
115+
};
116+
117+
let children = unsafe {
118+
let children_fn = (*plan).children.ok_or(DataFusionError::NotImplemented(
119+
"children not implemented on FFI_ExecutionPlan".to_string(),
120+
))?;
121+
let mut num_children = 0;
122+
let mut children_ptr: *const FFI_ExecutionPlan = null();
123+
124+
if children_fn(plan, &mut num_children, &mut children_ptr) != 0 {
125+
return Err(DataFusionError::Plan(
126+
"Error getting children for FFI_ExecutionPlan".to_string(),
127+
));
128+
}
129+
130+
let ffi_vec = Vec::from_raw_parts(&mut children_ptr, num_children, num_children);
131+
let maybe_children: Result<Vec<_>> = ffi_vec
132+
.into_iter()
133+
.map(|child| {
134+
ExportedExecutionPlan::new(child).map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>)
135+
})
136+
.collect();
137+
138+
maybe_children?
139+
};
140+
141+
Ok(Self {
142+
plan,
143+
properties,
144+
children,
145+
})
146+
}
147+
}
148+
149+
impl ExecutionPlan for ExportedExecutionPlan {
51150
fn name(&self) -> &str {
52151
todo!()
53152
}
@@ -57,11 +156,11 @@ impl ExecutionPlan for FFI_ExecutionPlan {
57156
}
58157

59158
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
60-
self.properties
159+
&self.properties
61160
}
62161

63162
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
64-
todo!()
163+
self.children.iter().collect()
65164
}
66165

67166
fn with_new_children(
@@ -81,16 +180,15 @@ impl ExecutionPlan for FFI_ExecutionPlan {
81180
}
82181

83182
impl DisplayAs for FFI_ExecutionPlan {
84-
fn fmt_as(&self, t: datafusion::physical_plan::DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
183+
fn fmt_as(
184+
&self,
185+
t: datafusion::physical_plan::DisplayFormatType,
186+
f: &mut std::fmt::Formatter,
187+
) -> std::fmt::Result {
85188
todo!()
86189
}
87190
}
88191

89-
90-
91-
92-
93-
94192
#[repr(C)]
95193
#[derive(Debug)]
96194
#[allow(missing_docs)]
@@ -104,4 +202,16 @@ pub struct FFI_PlanProperties {
104202
pub execution_mode: ExecutionMode,
105203
/// See [ExecutionPlanProperties::output_ordering]
106204
output_ordering: Option<LexOrdering>,
107-
}
205+
}
206+
207+
impl From<&PlanProperties> for FFI_PlanProperties {
208+
fn from(value: &PlanProperties) -> Self {
209+
todo!()
210+
}
211+
}
212+
213+
impl From<FFI_PlanProperties> for PlanProperties {
214+
fn from(value: FFI_PlanProperties) -> Self {
215+
todo!()
216+
}
217+
}

src/ffi/session_config.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::{ffi::{c_void, CString}, sync::Arc};
1+
use std::{
2+
ffi::{c_void, CString},
3+
sync::Arc,
4+
};
25

36
use datafusion::{catalog::Session, prelude::SessionConfig};
47

@@ -43,4 +46,4 @@ impl FFI_SessionConfig {
4346
private_data: Box::into_raw(private_data) as *mut c_void,
4447
}
4548
}
46-
}
49+
}

src/ffi/table_provider.rs

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use datafusion::{
2323
use tokio::runtime::Runtime;
2424

2525
use super::{
26-
execution_plan::{ExecutionPlanPrivateData, FFI_ExecutionPlan},
26+
execution_plan::{ExecutionPlanPrivateData, ExportedExecutionPlan, FFI_ExecutionPlan},
2727
session_config::{FFI_SessionConfig, SessionConfigPrivateData},
2828
};
2929
use datafusion::error::Result;
@@ -150,14 +150,15 @@ impl ExportedTableProvider {
150150
let runtime = Runtime::new().unwrap();
151151
let plan = runtime.block_on(provider.scan(session, projections, &filter_exprs, limit))?;
152152

153-
let plan_ptr = Box::new(ExecutionPlanPrivateData {
154-
plan,
155-
last_error: None,
156-
});
153+
// let plan_ptr = Box::new(ExecutionPlanPrivateData {
154+
// plan,
155+
// last_error: None,
156+
// });
157157

158-
Ok(FFI_ExecutionPlan {
159-
private_data: Box::into_raw(plan_ptr) as *mut c_void,
160-
})
158+
// Ok(FFI_ExecutionPlan {
159+
// private_data: Box::into_raw(plan_ptr) as *mut c_void,
160+
// })
161+
Ok(FFI_ExecutionPlan::new(plan))
161162
}
162163
}
163164

@@ -255,14 +256,17 @@ impl TableProvider for FFI_TableProvider {
255256
let session_config = FFI_SessionConfig::new(session);
256257

257258
let n_projections = projection.map(|p| p.len()).unwrap_or(0) as c_int;
258-
let projections: Vec<c_int> = projection.map(|p| p.iter().map(|v| *v as c_int).collect()).unwrap_or_default();
259+
let projections: Vec<c_int> = projection
260+
.map(|p| p.iter().map(|v| *v as c_int).collect())
261+
.unwrap_or_default();
259262
let projections_ptr = projections.as_ptr();
260263

261264
let n_filters = filters.len() as c_int;
262-
let filters: Vec<CString> = filters.iter().filter_map(|f| CString::new(f.to_string()).ok()).collect();
263-
let filters_ptr: Vec<*const i8> = filters.iter()
264-
.map(|s| s.as_ptr())
265+
let filters: Vec<CString> = filters
266+
.iter()
267+
.filter_map(|f| CString::new(f.to_string()).ok())
265268
.collect();
269+
let filters_ptr: Vec<*const i8> = filters.iter().map(|s| s.as_ptr()).collect();
266270

267271
let limit = match limit {
268272
Some(l) => l as c_int,
@@ -272,13 +276,26 @@ impl TableProvider for FFI_TableProvider {
272276
let mut out = FFI_ExecutionPlan::empty();
273277

274278
let err_code = unsafe {
275-
scan_fn(self, &session_config, n_projections, projections_ptr, n_filters, filters_ptr.as_ptr(), limit, &mut out)
279+
scan_fn(
280+
self,
281+
&session_config,
282+
n_projections,
283+
projections_ptr,
284+
n_filters,
285+
filters_ptr.as_ptr(),
286+
limit,
287+
&mut out,
288+
)
276289
};
277290

278-
match err_code {
279-
0 => Ok(Arc::new(out)),
280-
_ => Err(datafusion::error::DataFusionError::Internal("Unable to perform scan via FFI".to_string()))
291+
if 0 != err_code {
292+
return Err(datafusion::error::DataFusionError::Internal(
293+
"Unable to perform scan via FFI".to_string(),
294+
));
281295
}
296+
297+
let plan = ExportedExecutionPlan::new(&out)?;
298+
Ok(Arc::new(plan))
282299
}
283300

284301
/// Tests whether the table provider can make use of a filter expression

0 commit comments

Comments
 (0)