Skip to content

Commit f830058

Browse files
committed
Expand file structure
1 parent 7a476d2 commit f830058

File tree

5 files changed

+89
-95
lines changed

5 files changed

+89
-95
lines changed

src/context.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::dataset::Dataset;
3838
use crate::errors::{py_datafusion_err, DataFusionError};
3939
use crate::expr::sort_expr::PySortExpr;
4040
use crate::expr::PyExpr;
41-
use crate::ffi::FFI_TableProvider;
41+
use crate::ffi::table_provider::FFI_TableProvider;
4242
use crate::physical_plan::PyExecutionPlan;
4343
use crate::record_batch::PyRecordBatchStream;
4444
use crate::sql::logical::PyLogicalPlan;
@@ -579,26 +579,14 @@ impl PySessionContext {
579579
let capsule = capsule.downcast::<PyCapsule>()?;
580580
// validate_pycapsule(capsule, "arrow_array_stream")?;
581581

582-
let mut provider = unsafe { FFI_TableProvider::from_raw(capsule.pointer() as _) };
582+
let provider = unsafe { FFI_TableProvider::from_raw(capsule.pointer() as _) };
583583

584584
println!("Found provider version {}", provider.version);
585585

586586
let schema = provider.schema();
587587
println!("Got schema through TableProvider trait {}", schema);
588588

589-
// if let Some(s) = provider.schema {
590-
// let mut schema = s(provider);
591-
592-
// if ret_code == 0 {
593-
// let schema = Schema::try_from(&schema)
594-
// .map_err(|e| PyValueError::new_err(e.to_string()))?;
595-
// println!("got schema {}", schema);
596-
// } else {
597-
// return Err(PyValueError::new_err(format!(
598-
// "Cannot get schema from input stream. Error code: {ret_code:?}"
599-
// )));
600-
// }
601-
// }
589+
let _ = self.ctx.register_table(name, Arc::new(provider))?;
602590
}
603591
Ok(())
604592
}

src/ffi/execution_plan.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use std::{
2+
ffi::{c_void, CString},
3+
sync::Arc,
4+
};
5+
6+
use datafusion::physical_plan::ExecutionPlan;
7+
8+
#[repr(C)]
9+
#[derive(Debug)]
10+
#[allow(missing_docs)]
11+
#[allow(non_camel_case_types)]
12+
pub struct FFI_ExecutionPlan {
13+
pub private_data: *mut c_void,
14+
}
15+
16+
unsafe impl Send for FFI_ExecutionPlan {}
17+
18+
pub struct ExecutionPlanPrivateData {
19+
pub plan: Arc<dyn ExecutionPlan + Send>,
20+
pub last_error: Option<CString>,
21+
}

src/ffi/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
pub mod execution_plan;
2+
pub mod session_config;
3+
pub mod table_provider;
4+
5+
#[repr(C)]
6+
#[derive(Debug)]
7+
#[allow(non_camel_case_types)]
8+
pub enum FFI_Constraint {
9+
/// Columns with the given indices form a composite primary key (they are
10+
/// jointly unique and not nullable):
11+
PrimaryKey(Vec<usize>),
12+
/// Columns with the given indices form a composite unique key:
13+
Unique(Vec<usize>),
14+
}
15+
16+
#[repr(C)]
17+
#[derive(Debug)]
18+
#[allow(missing_docs)]
19+
#[allow(non_camel_case_types)]
20+
pub struct FFI_Expr {}

src/ffi/session_config.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use std::ffi::{c_void, CString};
2+
3+
use datafusion::prelude::SessionConfig;
4+
5+
#[repr(C)]
6+
#[derive(Debug)]
7+
#[allow(missing_docs)]
8+
#[allow(non_camel_case_types)]
9+
pub struct FFI_SessionConfig {
10+
pub version: i64,
11+
12+
pub private_data: *mut c_void,
13+
}
14+
15+
unsafe impl Send for FFI_SessionConfig {}
16+
17+
pub struct SessionConfigPrivateData {
18+
pub config: SessionConfig,
19+
pub last_error: Option<CString>,
20+
}
21+
22+
struct ExportedSessionConfig {
23+
session: *mut FFI_SessionConfig,
24+
}
25+
26+
impl ExportedSessionConfig {
27+
fn get_private_data(&mut self) -> &mut SessionConfigPrivateData {
28+
unsafe { &mut *((*self.session).private_data as *mut SessionConfigPrivateData) }
29+
}
30+
}

src/ffi.rs renamed to src/ffi/table_provider.rs

Lines changed: 15 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{
22
any::Any,
33
ffi::{c_char, c_int, c_void, CStr, CString},
4-
ptr::{addr_of, addr_of_mut},
54
sync::Arc,
65
};
76

@@ -14,73 +13,20 @@ use async_trait::async_trait;
1413
use datafusion::{
1514
catalog::{Session, TableProvider},
1615
common::DFSchema,
16+
datasource::TableType,
17+
error::DataFusionError,
1718
execution::{context::SessionState, session_state::SessionStateBuilder},
19+
logical_expr::TableProviderFilterPushDown,
1820
physical_plan::ExecutionPlan,
19-
prelude::{Expr, SessionConfig},
20-
};
21-
use datafusion::{
22-
common::Result, datasource::TableType, logical_expr::TableProviderFilterPushDown,
21+
prelude::Expr,
2322
};
2423
use tokio::runtime::Runtime;
2524

26-
#[repr(C)]
27-
#[derive(Debug)]
28-
#[allow(non_camel_case_types)]
29-
pub enum FFI_Constraint {
30-
/// Columns with the given indices form a composite primary key (they are
31-
/// jointly unique and not nullable):
32-
PrimaryKey(Vec<usize>),
33-
/// Columns with the given indices form a composite unique key:
34-
Unique(Vec<usize>),
35-
}
36-
37-
#[repr(C)]
38-
#[derive(Debug)]
39-
#[allow(missing_docs)]
40-
#[allow(non_camel_case_types)]
41-
pub struct FFI_ExecutionPlan {
42-
pub private_data: *mut c_void,
43-
}
44-
45-
unsafe impl Send for FFI_ExecutionPlan {}
46-
47-
struct ExecutionPlanPrivateData {
48-
plan: Arc<dyn ExecutionPlan + Send>,
49-
last_error: Option<CString>,
50-
}
51-
52-
#[repr(C)]
53-
#[derive(Debug)]
54-
#[allow(missing_docs)]
55-
#[allow(non_camel_case_types)]
56-
pub struct FFI_SessionConfig {
57-
pub version: i64,
58-
59-
pub private_data: *mut c_void,
60-
}
61-
62-
unsafe impl Send for FFI_SessionConfig {}
63-
64-
struct SessionConfigPrivateData {
65-
config: SessionConfig,
66-
last_error: Option<CString>,
67-
}
68-
69-
struct ExportedSessionConfig {
70-
session: *mut FFI_SessionConfig,
71-
}
72-
73-
impl ExportedSessionConfig {
74-
fn get_private_data(&mut self) -> &mut SessionConfigPrivateData {
75-
unsafe { &mut *((*self.session).private_data as *mut SessionConfigPrivateData) }
76-
}
77-
}
78-
79-
#[repr(C)]
80-
#[derive(Debug)]
81-
#[allow(missing_docs)]
82-
#[allow(non_camel_case_types)]
83-
pub struct FFI_Expr {}
25+
use super::{
26+
execution_plan::{ExecutionPlanPrivateData, FFI_ExecutionPlan},
27+
session_config::{FFI_SessionConfig, SessionConfigPrivateData},
28+
};
29+
use datafusion::error::Result;
8430

8531
#[repr(C)]
8632
#[derive(Debug)]
@@ -121,7 +67,6 @@ struct ConstExportedTableProvider {
12167

12268
// The callback used to get array schema
12369
unsafe extern "C" fn provider_schema(provider: *const FFI_TableProvider) -> FFI_ArrowSchema {
124-
println!("callback function");
12570
ConstExportedTableProvider { provider }.provider_schema()
12671
}
12772

@@ -181,18 +126,12 @@ impl ConstExportedTableProvider {
181126
}
182127

183128
pub fn provider_schema(&self) -> FFI_ArrowSchema {
184-
println!("Enter exported table provider");
185129
let private_data = self.get_private_data();
186130
let provider = &private_data.provider;
187131

188-
println!("about to try from in provider.schema()");
189132
// This does silently fail because TableProvider does not return a result
190133
// so we expect it to always pass. Maybe some logging should be added.
191-
let mut schema = FFI_ArrowSchema::try_from(provider.schema().as_ref())
192-
.unwrap_or(FFI_ArrowSchema::empty());
193-
194-
println!("Found the schema but can we return it?");
195-
schema
134+
FFI_ArrowSchema::try_from(provider.schema().as_ref()).unwrap_or(FFI_ArrowSchema::empty())
196135
}
197136
}
198137

@@ -294,15 +233,7 @@ impl TableProvider for FFI_TableProvider {
294233
/// Get a reference to the schema for this table
295234
fn schema(&self) -> SchemaRef {
296235
let schema = match self.schema {
297-
Some(func) => {
298-
println!("About to call the function to get the schema");
299-
unsafe {
300-
let v = func(self);
301-
println!("Got the mutalbe ffi_arrow_schmea?");
302-
// func(self).as_ref().and_then(|s| Schema::try_from(s).ok())
303-
Schema::try_from(&func(self)).ok()
304-
}
305-
}
236+
Some(func) => unsafe { Schema::try_from(&func(self)).ok() },
306237
None => None,
307238
};
308239
Arc::new(schema.unwrap_or(Schema::empty()))
@@ -328,6 +259,10 @@ impl TableProvider for FFI_TableProvider {
328259
// The datasource should return *at least* this number of rows if available.
329260
_limit: Option<usize>,
330261
) -> Result<Arc<dyn ExecutionPlan>> {
262+
let scan_fn = self.scan.ok_or(DataFusionError::NotImplemented(
263+
"Scan not defined on FFI_TableProvider".to_string(),
264+
))?;
265+
331266
Err(datafusion::error::DataFusionError::NotImplemented(
332267
"scan not implemented".to_string(),
333268
))

0 commit comments

Comments
 (0)