Skip to content

Commit a2419d2

Browse files
committed
UNPICK changes to review
1 parent 0b7f18d commit a2419d2

File tree

15 files changed

+110
-347
lines changed

15 files changed

+110
-347
lines changed

docs/source/contributor-guide/ffi.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ as performant as possible and to utilize the features of DataFusion, you may dec
3434
your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a Python library.
3535

3636
At first glance, it may appear the best way to do this is to add the ``datafusion-python``
37-
crate as a dependency, produce a DataFusion table in Rust, and then register it with the
37+
crate as a dependency, provide a ``PyTable``, and then to register it with the
3838
``SessionContext``. Unfortunately, this will not work.
3939

4040
When you produce your code as a Python library and it needs to interact with the DataFusion

docs/source/user-guide/data-sources.rst

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,9 @@ as Delta Lake. This will require a recent version of
152152
.. code-block:: python
153153
154154
from deltalake import DeltaTable
155-
from datafusion import TableProvider
156155
157156
delta_table = DeltaTable("path_to_table")
158-
provider = TableProvider.from_capsule(delta_table)
159-
ctx.register_table("my_delta_table", provider)
157+
ctx.register_table_provider("my_delta_table", delta_table)
160158
df = ctx.table("my_delta_table")
161159
df.show()
162160

docs/source/user-guide/io/table_provider.rst

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,29 +47,12 @@ A complete example can be found in the `examples folder <https://github.com/apac
4747
}
4848
}
4949
50-
Once you have this library available, you can construct a
51-
:py:class:`~datafusion.TableProvider` in Python and register it with the
52-
``SessionContext``. Table providers can be created either from the PyCapsule exposed by
53-
your Rust provider or from an existing :py:class:`~datafusion.dataframe.DataFrame`
54-
using ``TableProvider.from_view()``.
50+
Once you have this library available, in python you can register your table provider
51+
to the ``SessionContext``.
5552

5653
.. code-block:: python
5754
58-
from datafusion import SessionContext, TableProvider
59-
60-
ctx = SessionContext()
6155
provider = MyTableProvider()
56+
ctx.register_table_provider("my_table", provider)
6257
63-
capsule_provider = TableProvider.from_capsule(provider)
64-
65-
df = ctx.from_pydict({"a": [1]})
66-
view_provider = TableProvider.from_view(df)
67-
68-
ctx.register_table("capsule_table", capsule_provider)
69-
ctx.register_table("view_table", view_provider)
70-
71-
ctx.table("capsule_table").show()
72-
ctx.table("view_table").show()
73-
74-
Both ``TableProvider.from_capsule()`` and ``TableProvider.from_view()`` create
75-
table providers that can be registered with the SessionContext using ``register_table()``.
58+
ctx.table("my_table").show()

python/datafusion/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from . import functions, object_store, substrait, unparser
3434

3535
# The following imports are okay to remain as opaque to the user.
36-
from ._internal import Config, TableProvider
36+
from ._internal import Config
3737
from .catalog import Catalog, Database, Table
3838
from .col import col, column
3939
from .common import (
@@ -90,7 +90,6 @@
9090
"SessionContext",
9191
"Table",
9292
"TableFunction",
93-
"TableProvider",
9493
"WindowFrame",
9594
"WindowUDF",
9695
"catalog",

python/datafusion/catalog.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
if TYPE_CHECKING:
2828
import pyarrow as pa
2929

30-
from datafusion import TableProvider
31-
3230
try:
3331
from warnings import deprecated # Python 3.13+
3432
except ImportError:
@@ -124,8 +122,8 @@ def table(self, name: str) -> Table:
124122
"""Return the table with the given ``name`` from this schema."""
125123
return Table(self._raw_schema.table(name))
126124

127-
def register_table(self, name, table: Table | TableProvider) -> None:
128-
"""Register a table or table provider in this schema."""
125+
def register_table(self, name, table) -> None:
126+
"""Register a table provider in this schema."""
129127
if isinstance(table, Table):
130128
return self._raw_schema.register_table(name, table.table)
131129
return self._raw_schema.register_table(name, table)
@@ -221,8 +219,8 @@ def table(self, name: str) -> Table | None:
221219
"""Retrieve a specific table from this schema."""
222220
...
223221

224-
def register_table(self, name: str, table: Table | TableProvider) -> None: # noqa: B027
225-
"""Add a table to this schema.
222+
def register_table(self, name: str, table: Table) -> None: # noqa: B027
223+
"""Add a table from this schema.
226224
227225
This method is optional. If your schema provides a fixed list of tables, you do
228226
not need to implement this method.

python/datafusion/context.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import pandas as pd
4747
import polars as pl
4848

49-
from datafusion import TableProvider
5049
from datafusion.plan import ExecutionPlan, LogicalPlan
5150

5251

@@ -744,21 +743,16 @@ def register_view(self, name: str, df: DataFrame) -> None:
744743
view = df.into_view()
745744
self.ctx.register_table(name, view)
746745

747-
def register_table(self, name: str, table: Table | TableProvider) -> None:
748-
"""Register a :py:class:`~datafusion.catalog.Table` or ``TableProvider``.
746+
def register_table(self, name: str, table: Table) -> None:
747+
"""Register a :py:class: `~datafusion.catalog.Table` as a table.
749748
750-
The registered table can be referenced from SQL statements executed against
751-
this context.
749+
The registered table can be referenced from SQL statement executed against.
752750
753751
Args:
754752
name: Name of the resultant table.
755-
table: DataFusion :class:`Table` or :class:`TableProvider` to add to the
756-
session context.
753+
table: DataFusion table to add to the session context.
757754
"""
758-
if isinstance(table, Table):
759-
self.ctx.register_table(name, table.table)
760-
else:
761-
self.ctx.register_table(name, table)
755+
self.ctx.register_table(name, table.table)
762756

763757
def deregister_table(self, name: str) -> None:
764758
"""Remove a table from the session."""
@@ -778,18 +772,14 @@ def register_catalog_provider(
778772
self.ctx.register_catalog_provider(name, provider)
779773

780774
def register_table_provider(
781-
self, name: str, provider: TableProviderExportable | TableProvider
775+
self, name: str, provider: TableProviderExportable
782776
) -> None:
783777
"""Register a table provider.
784778
785-
Deprecated: use :meth:`register_table` instead.
779+
This table provider must have a method called ``__datafusion_table_provider__``
780+
which returns a PyCapsule that exposes a ``FFI_TableProvider``.
786781
"""
787-
warnings.warn(
788-
"register_table_provider is deprecated; use register_table",
789-
DeprecationWarning,
790-
stacklevel=2,
791-
)
792-
self.register_table(name, provider)
782+
self.ctx.register_table_provider(name, provider)
793783

794784
def register_udtf(self, func: TableFunction) -> None:
795785
"""Register a user defined table function."""

python/datafusion/dataframe.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
from datafusion._internal import DataFrame as DataFrameInternal
4141
from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
4242
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
43-
from datafusion._internal import TableProvider as TableProviderInternal
4443
from datafusion.expr import Expr, SortExpr, sort_or_default
4544
from datafusion.plan import ExecutionPlan, LogicalPlan
4645
from datafusion.record_batch import RecordBatchStream
@@ -308,14 +307,8 @@ def __init__(self, df: DataFrameInternal) -> None:
308307
"""
309308
self.df = df
310309

311-
def into_view(self) -> TableProviderInternal:
312-
"""Convert ``DataFrame`` into a ``TableProvider`` view for registration.
313-
314-
This is the preferred way to obtain a view for
315-
:py:meth:`~datafusion.context.SessionContext.register_table`.
316-
``TableProvider.from_dataframe`` calls this method under the hood,
317-
and the older ``TableProvider.from_view`` helper is deprecated.
318-
"""
310+
def into_view(self) -> pa.Table:
311+
"""Convert DataFrame as a ViewTable which can be used in register_table."""
319312
return self.df.into_view()
320313

321314
def __getitem__(self, key: str | list[str]) -> DataFrame:

python/tests/test_context.py

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
SessionConfig,
2828
SessionContext,
2929
SQLOptions,
30-
TableProvider,
3130
column,
3231
literal,
3332
)
@@ -331,35 +330,6 @@ def test_deregister_table(ctx, database):
331330
assert public.names() == {"csv1", "csv2"}
332331

333332

334-
def test_register_table_from_dataframe_into_view(ctx):
335-
df = ctx.from_pydict({"a": [1, 2]})
336-
provider = df.into_view()
337-
ctx.register_table("view_tbl", provider)
338-
result = ctx.sql("SELECT * FROM view_tbl").collect()
339-
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
340-
341-
342-
def test_table_provider_from_capsule(ctx):
343-
df = ctx.from_pydict({"a": [1, 2]})
344-
provider = df.into_view()
345-
capsule = provider.__datafusion_table_provider__()
346-
provider2 = TableProvider.from_capsule(capsule)
347-
ctx.register_table("capsule_tbl", provider2)
348-
result = ctx.sql("SELECT * FROM capsule_tbl").collect()
349-
assert [b.to_pydict() for b in result] == [{"a": [1, 2]}]
350-
351-
352-
def test_table_provider_from_capsule_invalid():
353-
with pytest.raises(Exception): # noqa: B017
354-
TableProvider.from_capsule(object())
355-
356-
357-
def test_register_table_with_dataframe_errors(ctx):
358-
df = ctx.from_pydict({"a": [1]})
359-
with pytest.raises(Exception): # noqa: B017
360-
ctx.register_table("bad", df)
361-
362-
363333
def test_register_dataset(ctx):
364334
# create a RecordBatch and register it as a pyarrow.dataset.Dataset
365335
batch = pa.RecordBatch.from_arrays(

src/catalog.rs

Lines changed: 16 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use crate::dataset::Dataset;
1919
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, PyDataFusionResult};
20-
use crate::table::PyTableProvider;
2120
use crate::utils::{validate_pycapsule, wait_for_future};
2221
use async_trait::async_trait;
2322
use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
@@ -52,7 +51,7 @@ pub struct PySchema {
5251
#[pyclass(name = "RawTable", module = "datafusion.catalog", subclass)]
5352
#[derive(Clone)]
5453
pub struct PyTable {
55-
pub table: Arc<dyn TableProvider + Send>,
54+
pub table: Arc<dyn TableProvider>,
5655
}
5756

5857
impl From<Arc<dyn CatalogProvider>> for PyCatalog {
@@ -68,11 +67,11 @@ impl From<Arc<dyn SchemaProvider>> for PySchema {
6867
}
6968

7069
impl PyTable {
71-
pub fn new(table: Arc<dyn TableProvider + Send>) -> Self {
70+
pub fn new(table: Arc<dyn TableProvider>) -> Self {
7271
Self { table }
7372
}
7473

75-
pub fn table(&self) -> Arc<dyn TableProvider + Send> {
74+
pub fn table(&self) -> Arc<dyn TableProvider> {
7675
self.table.clone()
7776
}
7877
}
@@ -206,18 +205,15 @@ impl PySchema {
206205

207206
let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
208207
let provider: ForeignTableProvider = provider.into();
209-
Arc::new(provider) as Arc<dyn TableProvider + Send>
208+
Arc::new(provider) as Arc<dyn TableProvider>
210209
} else {
211210
match table_provider.extract::<PyTable>() {
212211
Ok(py_table) => py_table.table,
213-
Err(_) => match table_provider.extract::<PyTableProvider>() {
214-
Ok(py_provider) => py_provider.into_inner(),
215-
Err(_) => {
216-
let py = table_provider.py();
217-
let provider = Dataset::new(&table_provider, py)?;
218-
Arc::new(provider) as Arc<dyn TableProvider + Send>
219-
}
220-
},
212+
Err(_) => {
213+
let py = table_provider.py();
214+
let provider = Dataset::new(&table_provider, py)?;
215+
Arc::new(provider) as Arc<dyn TableProvider>
216+
}
221217
}
222218
};
223219

@@ -298,7 +294,7 @@ impl RustWrappedPySchemaProvider {
298294
}
299295
}
300296

301-
fn table_inner(&self, name: &str) -> PyResult<Option<Arc<dyn TableProvider + Send>>> {
297+
fn table_inner(&self, name: &str) -> PyResult<Option<Arc<dyn TableProvider>>> {
302298
Python::with_gil(|py| {
303299
let provider = self.schema_provider.bind(py);
304300
let py_table_method = provider.getattr("table")?;
@@ -309,30 +305,26 @@ impl RustWrappedPySchemaProvider {
309305
}
310306

311307
if py_table.hasattr("__datafusion_table_provider__")? {
312-
let capsule = py_table.getattr("__datafusion_table_provider__")?.call0()?;
308+
let capsule = provider.getattr("__datafusion_table_provider__")?.call0()?;
313309
let capsule = capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
314310
validate_pycapsule(capsule, "datafusion_table_provider")?;
315311

316312
let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
317313
let provider: ForeignTableProvider = provider.into();
318314

319-
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider + Send>))
315+
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
320316
} else {
321317
if let Ok(inner_table) = py_table.getattr("table") {
322318
if let Ok(inner_table) = inner_table.extract::<PyTable>() {
323319
return Ok(Some(inner_table.table));
324320
}
325321
}
326322

327-
if let Ok(py_provider) = py_table.extract::<PyTableProvider>() {
328-
return Ok(Some(py_provider.into_inner()));
329-
}
330-
331323
match py_table.extract::<PyTable>() {
332324
Ok(py_table) => Ok(Some(py_table.table)),
333325
Err(_) => {
334326
let ds = Dataset::new(&py_table, py).map_err(py_datafusion_err)?;
335-
Ok(Some(Arc::new(ds) as Arc<dyn TableProvider + Send>))
327+
Ok(Some(Arc::new(ds) as Arc<dyn TableProvider>))
336328
}
337329
}
338330
}
@@ -368,32 +360,15 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
368360
&self,
369361
name: &str,
370362
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
371-
// Convert from our internal Send type to the trait expected type
372-
match self.table_inner(name).map_err(to_datafusion_err)? {
373-
Some(table) => {
374-
// Safe conversion: we're widening the bounds (removing Send)
375-
let raw = Arc::into_raw(table);
376-
let wide: *const dyn TableProvider = raw as *const _;
377-
let arc = unsafe { Arc::from_raw(wide) };
378-
Ok(Some(arc))
379-
}
380-
None => Ok(None),
381-
}
363+
self.table_inner(name).map_err(to_datafusion_err)
382364
}
383365

384366
fn register_table(
385367
&self,
386368
name: String,
387369
table: Arc<dyn TableProvider>,
388370
) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
389-
// Convert from trait type to our internal Send type
390-
let send_table = {
391-
let raw = Arc::into_raw(table);
392-
let send: *const (dyn TableProvider + Send) = raw as *const _;
393-
unsafe { Arc::from_raw(send) }
394-
};
395-
396-
let py_table = PyTable::new(send_table);
371+
let py_table = PyTable::new(table);
397372
Python::with_gil(|py| {
398373
let provider = self.schema_provider.bind(py);
399374
let _ = provider
@@ -422,14 +397,7 @@ impl SchemaProvider for RustWrappedPySchemaProvider {
422397
// If we can turn this table provider into a `Dataset`, return it.
423398
// Otherwise, return None.
424399
let dataset = match Dataset::new(&table, py) {
425-
Ok(dataset) => {
426-
// Convert from our internal Send type to trait expected type
427-
let send_table = Arc::new(dataset) as Arc<dyn TableProvider + Send>;
428-
let raw = Arc::into_raw(send_table);
429-
let wide: *const dyn TableProvider = raw as *const _;
430-
let arc = unsafe { Arc::from_raw(wide) };
431-
Some(arc)
432-
}
400+
Ok(dataset) => Some(Arc::new(dataset) as Arc<dyn TableProvider>),
433401
Err(_) => None,
434402
};
435403

0 commit comments

Comments
 (0)