Skip to content

Commit 19e4dde

Browse files
committed
WIP on execution plan
1 parent 4237398 commit 19e4dde

File tree

3 files changed

+147
-32
lines changed

3 files changed

+147
-32
lines changed

src/ffi/execution_plan.rs

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,105 @@ use std::{
33
sync::Arc,
44
};
55

6-
use datafusion::physical_plan::ExecutionPlan;
6+
use datafusion::{physical_expr::{EquivalenceProperties, LexOrdering}, physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties}};
77

88
#[repr(C)]
99
#[derive(Debug)]
1010
#[allow(missing_docs)]
1111
#[allow(non_camel_case_types)]
1212
pub struct FFI_ExecutionPlan {
13+
pub properties: Option<unsafe extern "C" fn(provider: *const FFI_ExecutionPlan) -> FFI_ArrowSchema>,
14+
1315
pub private_data: *mut c_void,
1416
}
1517

1618
unsafe impl Send for FFI_ExecutionPlan {}
19+
unsafe impl Sync for FFI_ExecutionPlan {}
1720

1821
pub struct ExecutionPlanPrivateData {
1922
pub plan: Arc<dyn ExecutionPlan + Send>,
2023
pub last_error: Option<CString>,
2124
}
25+
26+
struct ExportedExecutionPlan(*const FFI_ExecutionPlan);
27+
28+
impl FFI_ExecutionPlan {
29+
30+
pub fn empty() -> Self {
31+
Self {
32+
private_data: std::ptr::null_mut(),
33+
}
34+
}
35+
}
36+
37+
impl FFI_ExecutionPlan {
38+
pub fn new(plan: Arc<dyn ExecutionPlan + Send>) -> Self {
39+
let private_data = Box::new(ExecutionPlanPrivateData {
40+
plan,
41+
last_error: None,
42+
});
43+
44+
Self {
45+
private_data: Box::into_raw(private_data) as *mut c_void
46+
}
47+
}
48+
}
49+
50+
impl ExecutionPlan for FFI_ExecutionPlan {
51+
fn name(&self) -> &str {
52+
todo!()
53+
}
54+
55+
fn as_any(&self) -> &dyn std::any::Any {
56+
todo!()
57+
}
58+
59+
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
60+
self.properties
61+
}
62+
63+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
64+
todo!()
65+
}
66+
67+
fn with_new_children(
68+
self: Arc<Self>,
69+
children: Vec<Arc<dyn ExecutionPlan>>,
70+
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
71+
todo!()
72+
}
73+
74+
fn execute(
75+
&self,
76+
partition: usize,
77+
context: Arc<datafusion::execution::TaskContext>,
78+
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
79+
todo!()
80+
}
81+
}
82+
83+
impl DisplayAs for FFI_ExecutionPlan {
84+
fn fmt_as(&self, t: datafusion::physical_plan::DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
85+
todo!()
86+
}
87+
}
88+
89+
90+
91+
92+
93+
94+
#[repr(C)]
95+
#[derive(Debug)]
96+
#[allow(missing_docs)]
97+
#[allow(non_camel_case_types)]
98+
pub struct FFI_PlanProperties {
99+
/// See [ExecutionPlanProperties::equivalence_properties]
100+
pub eq_properties: EquivalenceProperties,
101+
/// See [ExecutionPlanProperties::output_partitioning]
102+
pub partitioning: Partitioning,
103+
/// See [ExecutionPlanProperties::execution_mode]
104+
pub execution_mode: ExecutionMode,
105+
/// See [ExecutionPlanProperties::output_ordering]
106+
output_ordering: Option<LexOrdering>,
107+
}

src/ffi/session_config.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use std::ffi::{c_void, CString};
1+
use std::{ffi::{c_void, CString}, sync::Arc};
22

3-
use datafusion::prelude::SessionConfig;
3+
use datafusion::{catalog::Session, prelude::SessionConfig};
44

55
#[repr(C)]
66
#[derive(Debug)]
@@ -28,3 +28,19 @@ impl ExportedSessionConfig {
2828
unsafe { &mut *((*self.session).private_data as *mut SessionConfigPrivateData) }
2929
}
3030
}
31+
32+
impl FFI_SessionConfig {
33+
/// Creates a new [`FFI_TableProvider`].
34+
pub fn new(session: &dyn Session) -> Self {
35+
let config = session.config().clone();
36+
let private_data = Box::new(SessionConfigPrivateData {
37+
config,
38+
last_error: None,
39+
});
40+
41+
Self {
42+
version: 2,
43+
private_data: Box::into_raw(private_data) as *mut c_void,
44+
}
45+
}
46+
}

src/ffi/table_provider.rs

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ pub struct FFI_TableProvider {
3737
pub schema: Option<unsafe extern "C" fn(provider: *const FFI_TableProvider) -> FFI_ArrowSchema>,
3838
pub scan: Option<
3939
unsafe extern "C" fn(
40-
provider: *mut FFI_TableProvider,
41-
session_config: *mut FFI_SessionConfig,
40+
provider: *const FFI_TableProvider,
41+
session_config: *const FFI_SessionConfig,
4242
n_projections: c_int,
43-
projections: *mut c_int,
43+
projections: *const c_int,
4444
n_filters: c_int,
45-
filters: *mut *const c_char,
45+
filters: *const *const c_char,
4646
limit: c_int,
4747
out: *mut FFI_ExecutionPlan,
4848
) -> c_int,
@@ -58,25 +58,20 @@ struct ProviderPrivateData {
5858
last_error: Option<CString>,
5959
}
6060

61-
struct ExportedTableProvider {
62-
provider: *mut FFI_TableProvider,
63-
}
64-
struct ConstExportedTableProvider {
65-
provider: *const FFI_TableProvider,
66-
}
61+
struct ExportedTableProvider(*const FFI_TableProvider);
6762

6863
// The callback used to get array schema
6964
unsafe extern "C" fn provider_schema(provider: *const FFI_TableProvider) -> FFI_ArrowSchema {
70-
ConstExportedTableProvider { provider }.provider_schema()
65+
ExportedTableProvider(provider).provider_schema()
7166
}
7267

7368
unsafe extern "C" fn provider_scan(
74-
provider: *mut FFI_TableProvider,
75-
session_config: *mut FFI_SessionConfig,
69+
provider: *const FFI_TableProvider,
70+
session_config: *const FFI_SessionConfig,
7671
n_projections: c_int,
77-
projections: *mut c_int,
72+
projections: *const c_int,
7873
n_filters: c_int,
79-
filters: *mut *const c_char,
74+
filters: *const *const c_char,
8075
limit: c_int,
8176
mut out: *mut FFI_ExecutionPlan,
8277
) -> c_int {
@@ -104,7 +99,7 @@ unsafe extern "C" fn provider_scan(
10499

105100
let limit = limit.try_into().ok();
106101

107-
let plan = ExportedTableProvider { provider }.provider_scan(
102+
let plan = ExportedTableProvider(provider).provider_scan(
108103
&session,
109104
maybe_projections,
110105
filters_vec,
@@ -120,9 +115,9 @@ unsafe extern "C" fn provider_scan(
120115
}
121116
}
122117

123-
impl ConstExportedTableProvider {
118+
impl ExportedTableProvider {
124119
fn get_private_data(&self) -> &ProviderPrivateData {
125-
unsafe { &*((*self.provider).private_data as *const ProviderPrivateData) }
120+
unsafe { &*((*self.0).private_data as *const ProviderPrivateData) }
126121
}
127122

128123
pub fn provider_schema(&self) -> FFI_ArrowSchema {
@@ -133,12 +128,6 @@ impl ConstExportedTableProvider {
133128
// so we expect it to always pass. Maybe some logging should be added.
134129
FFI_ArrowSchema::try_from(provider.schema().as_ref()).unwrap_or(FFI_ArrowSchema::empty())
135130
}
136-
}
137-
138-
impl ExportedTableProvider {
139-
fn get_private_data(&mut self) -> &mut ProviderPrivateData {
140-
unsafe { &mut *((*self.provider).private_data as *mut ProviderPrivateData) }
141-
}
142131

143132
pub fn provider_scan(
144133
&mut self,
@@ -250,22 +239,46 @@ impl TableProvider for FFI_TableProvider {
250239
/// parallelized or distributed.
251240
async fn scan(
252241
&self,
253-
_ctx: &dyn Session,
242+
session: &dyn Session,
254243
projection: Option<&Vec<usize>>,
255244
filters: &[Expr],
256245
// limit can be used to reduce the amount scanned
257246
// from the datasource as a performance optimization.
258247
// If set, it contains the amount of rows needed by the `LogicalPlan`,
259248
// The datasource should return *at least* this number of rows if available.
260-
_limit: Option<usize>,
249+
limit: Option<usize>,
261250
) -> Result<Arc<dyn ExecutionPlan>> {
262251
let scan_fn = self.scan.ok_or(DataFusionError::NotImplemented(
263252
"Scan not defined on FFI_TableProvider".to_string(),
264253
))?;
265254

266-
Err(datafusion::error::DataFusionError::NotImplemented(
267-
"scan not implemented".to_string(),
268-
))
255+
let session_config = FFI_SessionConfig::new(session);
256+
257+
let n_projections = projection.map(|p| p.len()).unwrap_or(0) as c_int;
258+
let projections: Vec<c_int> = projection.map(|p| p.iter().map(|v| *v as c_int).collect()).unwrap_or_default();
259+
let projections_ptr = projections.as_ptr();
260+
261+
let n_filters = filters.len() as c_int;
262+
let filters: Vec<CString> = filters.iter().filter_map(|f| CString::new(f.to_string()).ok()).collect();
263+
let filters_ptr: Vec<*const i8> = filters.iter()
264+
.map(|s| s.as_ptr())
265+
.collect();
266+
267+
let limit = match limit {
268+
Some(l) => l as c_int,
269+
None => -1,
270+
};
271+
272+
let mut out = FFI_ExecutionPlan::empty();
273+
274+
let err_code = unsafe {
275+
scan_fn(self, &session_config, n_projections, projections_ptr, n_filters, filters_ptr.as_ptr(), limit, &mut out)
276+
};
277+
278+
match err_code {
279+
0 => Ok(Arc::new(out)),
280+
_ => Err(datafusion::error::DataFusionError::Internal("Unable to perform scan via FFI".to_string()))
281+
}
269282
}
270283

271284
/// Tests whether the table provider can make use of a filter expression

0 commit comments

Comments
 (0)