Skip to content

Commit 13c484e

Browse files
committed
refactor: remove Send marker from TableProvider trait objects for improved type flexibility
1 parent eb3304d commit 13c484e

File tree

6 files changed

+24
-75
lines changed

6 files changed

+24
-75
lines changed

src/catalog.rs

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@ use crate::dataframe::PyDataFrame;
1919
use crate::dataset::Dataset;
2020
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, PyDataFusionResult};
2121
use crate::table::PyTableProvider;
22-
use crate::utils::{
23-
table_provider_from_pycapsule, table_provider_send_to_table_provider, table_provider_to_send,
24-
validate_pycapsule, wait_for_future,
25-
};
22+
use crate::utils::{table_provider_from_pycapsule, validate_pycapsule, wait_for_future};
2623
use async_trait::async_trait;
2724
use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
2825
use datafusion::common::DataFusionError;
@@ -55,7 +52,7 @@ pub struct PySchema {
5552
#[pyclass(name = "RawTable", module = "datafusion.catalog", subclass)]
5653
#[derive(Clone)]
5754
pub struct PyTable {
58-
pub table: Arc<dyn TableProvider + Send>,
55+
pub table: Arc<dyn TableProvider>,
5956
}
6057

6158
impl From<Arc<dyn CatalogProvider>> for PyCatalog {
@@ -71,11 +68,11 @@ impl From<Arc<dyn SchemaProvider>> for PySchema {
7168
}
7269

7370
impl PyTable {
74-
pub fn new(table: Arc<dyn TableProvider + Send>) -> Self {
71+
pub fn new(table: Arc<dyn TableProvider>) -> Self {
7572
Self { table }
7673
}
7774

78-
pub fn table(&self) -> Arc<dyn TableProvider + Send> {
75+
pub fn table(&self) -> Arc<dyn TableProvider> {
7976
self.table.clone()
8077
}
8178
}
@@ -215,7 +212,7 @@ impl PySchema {
215212
} else {
216213
let py = table_provider.py();
217214
let provider = Dataset::new(&table_provider, py)?;
218-
Arc::new(provider) as Arc<dyn TableProvider + Send>
215+
Arc::new(provider) as Arc<dyn TableProvider>
219216
};
220217

221218
let _ = self
@@ -295,7 +292,7 @@ impl RustWrappedPySchemaProvider {
295292
}
296293
}
297294

298-
fn table_inner(&self, name: &str) -> PyResult<Option<Arc<dyn TableProvider + Send>>> {
295+
fn table_inner(&self, name: &str) -> PyResult<Option<Arc<dyn TableProvider>>> {
299296
Python::with_gil(|py| {
300297
let provider = self.schema_provider.bind(py);
301298
let py_table_method = provider.getattr("table")?;
@@ -322,7 +319,7 @@ impl RustWrappedPySchemaProvider {
322319
Ok(py_table) => Ok(Some(py_table.table)),
323320
Err(_) => {
324321
let ds = Dataset::new(&py_table, py).map_err(py_datafusion_err)?;
325-
Ok(Some(Arc::new(ds) as Arc<dyn TableProvider + Send>))
322+
Ok(Some(Arc::new(ds) as Arc<dyn TableProvider>))
326323
}
327324
}
328325
}
@@ -358,22 +355,15 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
358355
&self,
359356
name: &str,
360357
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
361-
// Convert from our internal Send type to the trait expected type
362-
match self.table_inner(name).map_err(to_datafusion_err)? {
363-
Some(table) => Ok(Some(table_provider_send_to_table_provider(table))),
364-
None => Ok(None),
365-
}
358+
self.table_inner(name).map_err(to_datafusion_err)
366359
}
367360

368361
fn register_table(
369362
&self,
370363
name: String,
371364
table: Arc<dyn TableProvider>,
372365
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
373-
// Convert from trait type to our internal Send type
374-
let send_table = table_provider_to_send(table);
375-
376-
let py_table = PyTable::new(send_table);
366+
let py_table = PyTable::new(table);
377367
Python::with_gil(|py| {
378368
let provider = self.schema_provider.bind(py);
379369
let _ = provider
@@ -402,10 +392,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
402392
// If we can turn this table provider into a `Dataset`, return it.
403393
// Otherwise, return None.
404394
let dataset = match Dataset::new(&table, py) {
405-
Ok(dataset) => {
406-
let send_table = Arc::new(dataset) as Arc<dyn TableProvider + Send>;
407-
Some(table_provider_send_to_table_provider(send_table))
408-
}
395+
Ok(dataset) => Some(Arc::new(dataset) as Arc<dyn TableProvider>),
409396
Err(_) => None,
410397
};
411398

src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -852,7 +852,7 @@ impl PySessionContext {
852852
dataset: &Bound<'_, PyAny>,
853853
py: Python,
854854
) -> PyDataFusionResult<()> {
855-
let table: Arc<dyn TableProvider + Send> = Arc::new(Dataset::new(dataset, py)?);
855+
let table: Arc<dyn TableProvider> = Arc::new(Dataset::new(dataset, py)?);
856856

857857
self.ctx.register_table(name, table)?;
858858

src/dataframe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ impl PyDataFrame {
268268
}
269269
}
270270

271-
pub(crate) fn to_view_provider(&self) -> Arc<dyn TableProvider + Send> {
271+
pub(crate) fn to_view_provider(&self) -> Arc<dyn TableProvider> {
272272
self.df.as_ref().clone().into_view()
273273
}
274274

src/table.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,30 @@ use crate::utils::{get_tokio_runtime, validate_pycapsule};
3333
#[pyclass(name = "TableProvider", module = "datafusion")]
3434
#[derive(Clone)]
3535
pub struct PyTableProvider {
36-
pub(crate) provider: Arc<dyn TableProvider + Send>,
36+
pub(crate) provider: Arc<dyn TableProvider>,
3737
}
3838

3939
impl PyTableProvider {
40-
pub(crate) fn new(provider: Arc<dyn TableProvider + Send>) -> Self {
40+
pub(crate) fn new(provider: Arc<dyn TableProvider>) -> Self {
4141
Self { provider }
4242
}
4343

4444
/// Return a `PyTable` wrapper around this provider.
4545
///
4646
/// Historically callers chained `as_table().table()` to access the
47-
/// underlying `Arc<dyn TableProvider + Send>`. Prefer [`as_arc`] or
47+
/// underlying [`Arc<dyn TableProvider>`]. Prefer [`as_arc`] or
4848
/// [`into_inner`] for direct access instead.
4949
pub fn as_table(&self) -> PyTable {
5050
PyTable::new(Arc::clone(&self.provider))
5151
}
5252

5353
/// Return a clone of the inner [`TableProvider`].
54-
pub fn as_arc(&self) -> Arc<dyn TableProvider + Send> {
54+
pub fn as_arc(&self) -> Arc<dyn TableProvider> {
5555
Arc::clone(&self.provider)
5656
}
5757

5858
/// Consume this wrapper and return the inner [`TableProvider`].
59-
pub fn into_inner(self) -> Arc<dyn TableProvider + Send> {
59+
pub fn into_inner(self) -> Arc<dyn TableProvider> {
6060
self.provider
6161
}
6262
}
@@ -110,7 +110,8 @@ impl PyTableProvider {
110110
let name = CString::new("datafusion_table_provider").unwrap();
111111

112112
let runtime = get_tokio_runtime().0.handle().clone();
113-
let provider = FFI_TableProvider::new(Arc::clone(&self.provider), false, Some(runtime));
113+
let provider: Arc<dyn TableProvider + Send> = self.provider.clone();
114+
let provider = FFI_TableProvider::new(provider, false, Some(runtime));
114115

115116
PyCapsule::new(py, provider, Some(name.clone()))
116117
}

src/udtf.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ use std::sync::Arc;
2121
use crate::errors::{py_datafusion_err, to_datafusion_err};
2222
use crate::expr::PyExpr;
2323
use crate::table::PyTableProvider;
24-
use crate::utils::{
25-
table_provider_from_pycapsule, table_provider_send_to_table_provider, validate_pycapsule,
26-
};
24+
use crate::utils::{table_provider_from_pycapsule, validate_pycapsule};
2725
use datafusion::catalog::{TableFunctionImpl, TableProvider};
2826
use datafusion::error::Result as DataFusionResult;
2927
use datafusion::logical_expr::Expr;
@@ -88,7 +86,7 @@ impl PyTableFunction {
8886
fn call_python_table_function(
8987
func: &Arc<PyObject>,
9088
args: &[Expr],
91-
) -> DataFusionResult<Arc<dyn TableProvider + Send>> {
89+
) -> DataFusionResult<Arc<dyn TableProvider>> {
9290
let args = args
9391
.iter()
9492
.map(|arg| PyExpr::from(arg.clone()))
@@ -113,10 +111,7 @@ impl TableFunctionImpl for PyTableFunction {
113111
fn call(&self, args: &[Expr]) -> DataFusionResult<Arc<dyn TableProvider>> {
114112
match &self.inner {
115113
PyTableFunctionInner::FFIFunction(func) => func.call(args),
116-
PyTableFunctionInner::PythonFunction(obj) => {
117-
let send_result = call_python_table_function(obj, args)?;
118-
Ok(table_provider_send_to_table_provider(send_result))
119-
}
114+
PyTableFunctionInner::PythonFunction(obj) => call_python_table_function(obj, args),
120115
}
121116
}
122117
}

src/utils.rs

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -122,51 +122,17 @@ pub(crate) fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyRe
122122
Ok(())
123123
}
124124

125-
/// Convert a [`TableProvider`] wrapped in an [`Arc`] with a `Send` auto trait into one
126-
/// without the marker.
127-
///
128-
/// # Safety
129-
///
130-
/// Removing `Send` from a trait object only relaxes the bounds. The underlying vtable is
131-
/// unchanged, so it is safe to reuse the pointer produced by [`Arc::into_raw`].
132-
pub(crate) fn table_provider_send_to_table_provider(
133-
table: Arc<dyn TableProvider + Send>,
134-
) -> Arc<dyn TableProvider> {
135-
let raw: *const (dyn TableProvider + Send) = Arc::into_raw(table);
136-
// SAFETY: `Send` is an auto trait with no associated data, so the trait object layout
137-
// is identical and the pointer may be reinterpreted without changing the reference
138-
// count.
139-
unsafe { Arc::from_raw(raw as *const dyn TableProvider) }
140-
}
141-
142-
/// Convert a [`TableProvider`] wrapped in an [`Arc`] into one that also carries the `Send`
143-
/// auto trait.
144-
///
145-
/// # Safety
146-
///
147-
/// DataFusion's `TableProvider` trait requires `Send`, so the underlying provider implements
148-
/// the marker. This allows us to reinterpret the pointer as a `TableProvider + Send` trait
149-
/// object.
150-
pub(crate) fn table_provider_to_send(
151-
table: Arc<dyn TableProvider>,
152-
) -> Arc<dyn TableProvider + Send> {
153-
let raw: *const dyn TableProvider = Arc::into_raw(table);
154-
// SAFETY: The underlying type implements `Send`, so the pointer can be safely treated as
155-
// a `TableProvider + Send` trait object.
156-
unsafe { Arc::from_raw(raw as *const (dyn TableProvider + Send)) }
157-
}
158-
159125
pub(crate) fn table_provider_from_pycapsule(
160126
obj: &Bound<PyAny>,
161-
) -> PyResult<Option<Arc<dyn TableProvider + Send>>> {
127+
) -> PyResult<Option<Arc<dyn TableProvider>>> {
162128
if obj.hasattr("__datafusion_table_provider__")? {
163129
let capsule = obj.getattr("__datafusion_table_provider__")?.call0()?;
164130
let capsule = capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
165131
validate_pycapsule(capsule, "datafusion_table_provider")?;
166132

167133
let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
168134
let provider: ForeignTableProvider = provider.into();
169-
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider + Send>))
135+
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
170136
} else {
171137
Ok(None)
172138
}

0 commit comments

Comments
 (0)