Skip to content

Commit f25a7fc

Browse files
committed
feat: add examples BlockingTableProvider with scan functionality and corresponding tests
1 parent d17ca68 commit f25a7fc

File tree

3 files changed

+80
-0
lines changed

3 files changed

+80
-0
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from datafusion import SessionContext
2+
from datafusion_ffi_example import BlockingTableProvider
3+
4+
5+
def test_runtime_available_in_scan() -> None:
6+
ctx = SessionContext()
7+
provider = BlockingTableProvider()
8+
ctx.register_table_provider("t", provider)
9+
result = ctx.table("t").collect()
10+
assert [b.column(0).to_pylist() for b in result] == [[1, 2, 3]]
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
2+
use arrow_schema::{DataType, Field, Schema, SchemaRef};
3+
use datafusion_catalog::{MemTable, Session, TableProvider, TableType};
4+
use datafusion_common::Result as DataFusionResult;
5+
use datafusion_expr::Expr;
6+
use datafusion_ffi::table_provider::FFI_TableProvider;
7+
use datafusion_physical_plan::ExecutionPlan;
8+
use pyo3::types::PyCapsule;
9+
use pyo3::{pyclass, pymethods, Bound, PyResult, Python};
10+
use std::any::Any;
11+
use std::sync::Arc;
12+
13+
#[pyclass(
14+
name = "BlockingTableProvider",
15+
module = "datafusion_ffi_example",
16+
subclass
17+
)]
18+
#[derive(Clone)]
19+
pub(crate) struct BlockingTableProvider;
20+
21+
#[pymethods]
22+
impl BlockingTableProvider {
23+
#[new]
24+
fn new() -> Self {
25+
Self
26+
}
27+
28+
fn __datafusion_table_provider__<'py>(
29+
&self,
30+
py: Python<'py>,
31+
) -> PyResult<Bound<'py, PyCapsule>> {
32+
let name = cr"datafusion_table_provider".into();
33+
let provider = FFI_TableProvider::new(Arc::new(self.clone()), false, None);
34+
PyCapsule::new(py, provider, Some(name))
35+
}
36+
}
37+
38+
#[async_trait::async_trait]
39+
impl TableProvider for BlockingTableProvider {
40+
fn as_any(&self) -> &dyn Any {
41+
self
42+
}
43+
44+
fn schema(&self) -> SchemaRef {
45+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
46+
}
47+
48+
fn table_type(&self) -> TableType {
49+
TableType::Base
50+
}
51+
52+
async fn scan(
53+
&self,
54+
state: &dyn Session,
55+
projection: Option<&Vec<usize>>,
56+
_filters: &[Expr],
57+
_limit: Option<usize>,
58+
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
59+
tokio::spawn_blocking(|| ()).await.unwrap();
60+
let batch = RecordBatch::try_new(
61+
self.schema(),
62+
vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
63+
)?;
64+
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
65+
table.scan(state, projection, &[], None).await
66+
}
67+
}

examples/datafusion-ffi-example/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,19 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::blocking_provider::BlockingTableProvider;
1819
use crate::table_function::MyTableFunction;
1920
use crate::table_provider::MyTableProvider;
2021
use pyo3::prelude::*;
2122

23+
pub(crate) mod blocking_provider;
2224
pub(crate) mod table_function;
2325
pub(crate) mod table_provider;
2426

2527
#[pymodule]
2628
fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
2729
m.add_class::<MyTableProvider>()?;
30+
m.add_class::<BlockingTableProvider>()?;
2831
m.add_class::<MyTableFunction>()?;
2932
Ok(())
3033
}

0 commit comments

Comments
 (0)