@@ -37,12 +37,12 @@ pub struct FFI_TableProvider {
3737 pub schema : Option < unsafe extern "C" fn ( provider : * const FFI_TableProvider ) -> FFI_ArrowSchema > ,
3838 pub scan : Option <
3939 unsafe extern "C" fn (
40- provider : * mut FFI_TableProvider ,
41- session_config : * mut FFI_SessionConfig ,
40+ provider : * const FFI_TableProvider ,
41+ session_config : * const FFI_SessionConfig ,
4242 n_projections : c_int ,
43- projections : * mut c_int ,
43+ projections : * const c_int ,
4444 n_filters : c_int ,
45- filters : * mut * const c_char ,
45+ filters : * const * const c_char ,
4646 limit : c_int ,
4747 out : * mut FFI_ExecutionPlan ,
4848 ) -> c_int ,
@@ -58,25 +58,20 @@ struct ProviderPrivateData {
5858 last_error : Option < CString > ,
5959}
6060
61- struct ExportedTableProvider {
62- provider : * mut FFI_TableProvider ,
63- }
64- struct ConstExportedTableProvider {
65- provider : * const FFI_TableProvider ,
66- }
61+ struct ExportedTableProvider ( * const FFI_TableProvider ) ;
6762
6863// The callback used to get array schema
6964unsafe extern "C" fn provider_schema ( provider : * const FFI_TableProvider ) -> FFI_ArrowSchema {
70- ConstExportedTableProvider { provider } . provider_schema ( )
65+ ExportedTableProvider ( provider) . provider_schema ( )
7166}
7267
7368unsafe extern "C" fn provider_scan (
74- provider : * mut FFI_TableProvider ,
75- session_config : * mut FFI_SessionConfig ,
69+ provider : * const FFI_TableProvider ,
70+ session_config : * const FFI_SessionConfig ,
7671 n_projections : c_int ,
77- projections : * mut c_int ,
72+ projections : * const c_int ,
7873 n_filters : c_int ,
79- filters : * mut * const c_char ,
74+ filters : * const * const c_char ,
8075 limit : c_int ,
8176 mut out : * mut FFI_ExecutionPlan ,
8277) -> c_int {
@@ -104,7 +99,7 @@ unsafe extern "C" fn provider_scan(
10499
105100 let limit = limit. try_into ( ) . ok ( ) ;
106101
107- let plan = ExportedTableProvider { provider } . provider_scan (
102+ let plan = ExportedTableProvider ( provider) . provider_scan (
108103 & session,
109104 maybe_projections,
110105 filters_vec,
@@ -120,9 +115,9 @@ unsafe extern "C" fn provider_scan(
120115 }
121116}
122117
123- impl ConstExportedTableProvider {
118+ impl ExportedTableProvider {
124119 fn get_private_data ( & self ) -> & ProviderPrivateData {
125- unsafe { & * ( ( * self . provider ) . private_data as * const ProviderPrivateData ) }
120+ unsafe { & * ( ( * self . 0 ) . private_data as * const ProviderPrivateData ) }
126121 }
127122
128123 pub fn provider_schema ( & self ) -> FFI_ArrowSchema {
@@ -133,12 +128,6 @@ impl ConstExportedTableProvider {
133128 // so we expect it to always pass. Maybe some logging should be added.
134129 FFI_ArrowSchema :: try_from ( provider. schema ( ) . as_ref ( ) ) . unwrap_or ( FFI_ArrowSchema :: empty ( ) )
135130 }
136- }
137-
138- impl ExportedTableProvider {
139- fn get_private_data ( & mut self ) -> & mut ProviderPrivateData {
140- unsafe { & mut * ( ( * self . provider ) . private_data as * mut ProviderPrivateData ) }
141- }
142131
143132 pub fn provider_scan (
144133 & mut self ,
@@ -250,22 +239,46 @@ impl TableProvider for FFI_TableProvider {
250239 /// parallelized or distributed.
251240 async fn scan (
252241 & self ,
253- _ctx : & dyn Session ,
242+ session : & dyn Session ,
254243 projection : Option < & Vec < usize > > ,
255244 filters : & [ Expr ] ,
256245 // limit can be used to reduce the amount scanned
257246 // from the datasource as a performance optimization.
258247 // If set, it contains the amount of rows needed by the `LogicalPlan`,
259248 // The datasource should return *at least* this number of rows if available.
260- _limit : Option < usize > ,
249+ limit : Option < usize > ,
261250 ) -> Result < Arc < dyn ExecutionPlan > > {
262251 let scan_fn = self . scan . ok_or ( DataFusionError :: NotImplemented (
263252 "Scan not defined on FFI_TableProvider" . to_string ( ) ,
264253 ) ) ?;
265254
266- Err ( datafusion:: error:: DataFusionError :: NotImplemented (
267- "scan not implemented" . to_string ( ) ,
268- ) )
255+ let session_config = FFI_SessionConfig :: new ( session) ;
256+
257+ let n_projections = projection. map ( |p| p. len ( ) ) . unwrap_or ( 0 ) as c_int ;
258+ let projections: Vec < c_int > = projection. map ( |p| p. iter ( ) . map ( |v| * v as c_int ) . collect ( ) ) . unwrap_or_default ( ) ;
259+ let projections_ptr = projections. as_ptr ( ) ;
260+
261+ let n_filters = filters. len ( ) as c_int ;
262+ let filters: Vec < CString > = filters. iter ( ) . filter_map ( |f| CString :: new ( f. to_string ( ) ) . ok ( ) ) . collect ( ) ;
263+ let filters_ptr: Vec < * const i8 > = filters. iter ( )
264+ . map ( |s| s. as_ptr ( ) )
265+ . collect ( ) ;
266+
267+ let limit = match limit {
268+ Some ( l) => l as c_int ,
269+ None => -1 ,
270+ } ;
271+
272+ let mut out = FFI_ExecutionPlan :: empty ( ) ;
273+
274+ let err_code = unsafe {
275+ scan_fn ( self , & session_config, n_projections, projections_ptr, n_filters, filters_ptr. as_ptr ( ) , limit, & mut out)
276+ } ;
277+
278+ match err_code {
279+ 0 => Ok ( Arc :: new ( out) ) ,
280+ _ => Err ( datafusion:: error:: DataFusionError :: Internal ( "Unable to perform scan via FFI" . to_string ( ) ) )
281+ }
269282 }
270283
271284 /// Tests whether the table provider can make use of a filter expression
0 commit comments