Skip to content

Commit 672e897

Browse files
committed
Revert "UNPICK changes to review"
This reverts commit 07d2718.
1 parent 07d2718 commit 672e897

File tree

18 files changed

+485
-156
lines changed

18 files changed

+485
-156
lines changed

docs/source/contributor-guide/ffi.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ as performant as possible and to utilize the features of DataFusion, you may dec
3434
your source in Rust and then expose it through `PyO3 <https://pyo3.rs>`_ as a Python library.
3535

3636
At first glance, it may appear the best way to do this is to add the ``datafusion-python``
37-
crate as a dependency, provide a ``PyTable``, and then to register it with the
37+
crate as a dependency, produce a DataFusion table in Rust, and then register it with the
3838
``SessionContext``. Unfortunately, this will not work.
3939

4040
When you produce your code as a Python library and it needs to interact with the DataFusion

docs/source/user-guide/data-sources.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,11 @@ as Delta Lake. This will require a recent version of
152152
.. code-block:: python
153153
154154
from deltalake import DeltaTable
155+
from datafusion import TableProvider
155156
156157
delta_table = DeltaTable("path_to_table")
157-
ctx.register_table_provider("my_delta_table", delta_table)
158+
provider = TableProvider.from_capsule(delta_table.__datafusion_table_provider__())
159+
ctx.register_table("my_delta_table", provider)
158160
df = ctx.table("my_delta_table")
159161
df.show()
160162

docs/source/user-guide/io/table_provider.rst

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,40 @@ A complete example can be found in the `examples folder <https://github.com/apac
3939
) -> PyResult<Bound<'py, PyCapsule>> {
4040
let name = CString::new("datafusion_table_provider").unwrap();
4141
42-
let provider = Arc::new(self.clone())
43-
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
44-
let provider = FFI_TableProvider::new(Arc::new(provider), false);
42+
let provider = Arc::new(self.clone());
43+
let provider = FFI_TableProvider::new(provider, false, None);
4544
4645
PyCapsule::new_bound(py, provider, Some(name.clone()))
4746
}
4847
}
4948
50-
Once you have this library available, in python you can register your table provider
51-
to the ``SessionContext``.
49+
Once you have this library available, you can construct a
50+
:py:class:`~datafusion.TableProvider` in Python and register it with the
51+
``SessionContext``. Table providers can be created either from the PyCapsule exposed by
52+
your Rust provider or from an existing :py:class:`~datafusion.dataframe.DataFrame`.
53+
Call the provider's ``__datafusion_table_provider__()`` method to obtain the capsule
54+
before constructing a ``TableProvider``. The ``TableProvider.from_view()`` helper is
55+
deprecated; instead use ``TableProvider.from_dataframe()`` or ``DataFrame.into_view()``.
5256

5357
.. code-block:: python
5458
59+
from datafusion import SessionContext, TableProvider
60+
61+
ctx = SessionContext()
5562
provider = MyTableProvider()
56-
ctx.register_table_provider("my_table", provider)
5763
58-
ctx.table("my_table").show()
64+
capsule = provider.__datafusion_table_provider__()
65+
capsule_provider = TableProvider.from_capsule(capsule)
66+
67+
df = ctx.from_pydict({"a": [1]})
68+
view_provider = TableProvider.from_dataframe(df)
69+
# or: view_provider = df.into_view()
70+
71+
ctx.register_table("capsule_table", capsule_provider)
72+
ctx.register_table("view_table", view_provider)
73+
74+
ctx.table("capsule_table").show()
75+
ctx.table("view_table").show()
76+
77+
Both ``TableProvider.from_capsule()`` and ``TableProvider.from_dataframe()`` create
78+
table providers that can be registered with the SessionContext using ``register_table()``.

examples/datafusion-ffi-example/python/tests/_test_table_function.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def test_ffi_table_function_call_directly():
5353
table_udtf = udtf(table_func, "my_table_func")
5454

5555
my_table = table_udtf()
56-
ctx.register_table_provider("t", my_table)
56+
ctx.register_table("t", my_table)
5757
result = ctx.table("t").collect()
5858

5959
assert len(result) == 2

examples/datafusion-ffi-example/python/tests/_test_table_provider.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
from __future__ import annotations
1919

2020
import pyarrow as pa
21-
from datafusion import SessionContext
21+
from datafusion import SessionContext, TableProvider
2222
from datafusion_ffi_example import MyTableProvider
2323

2424

2525
def test_table_loading():
2626
ctx = SessionContext()
2727
table = MyTableProvider(3, 2, 4)
28-
ctx.register_table_provider("t", table)
28+
ctx.register_table(
29+
"t", TableProvider.from_capsule(table.__datafusion_table_provider__())
30+
)
2931
result = ctx.table("t").collect()
3032

3133
assert len(result) == 4

python/datafusion/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from .io import read_avro, read_csv, read_json, read_parquet
5555
from .plan import ExecutionPlan, LogicalPlan
5656
from .record_batch import RecordBatch, RecordBatchStream
57+
from .table_provider import TableProvider
5758
from .user_defined import (
5859
Accumulator,
5960
AggregateUDF,
@@ -90,6 +91,7 @@
9091
"SessionContext",
9192
"Table",
9293
"TableFunction",
94+
"TableProvider",
9395
"WindowFrame",
9496
"WindowUDF",
9597
"catalog",

python/datafusion/catalog.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
if TYPE_CHECKING:
2828
import pyarrow as pa
2929

30+
from datafusion import TableProvider
31+
from datafusion.context import TableProviderExportable
32+
3033
try:
3134
from warnings import deprecated # Python 3.13+
3235
except ImportError:
@@ -82,7 +85,11 @@ def database(self, name: str = "public") -> Schema:
8285
"""Returns the database with the given ``name`` from this catalog."""
8386
return self.schema(name)
8487

85-
def register_schema(self, name, schema) -> Schema | None:
88+
def register_schema(
89+
self,
90+
name: str,
91+
schema: Schema | SchemaProvider | SchemaProviderExportable,
92+
) -> Schema | None:
8693
"""Register a schema with this catalog."""
8794
if isinstance(schema, Schema):
8895
return self.catalog.register_schema(name, schema._raw_schema)
@@ -122,8 +129,14 @@ def table(self, name: str) -> Table:
122129
"""Return the table with the given ``name`` from this schema."""
123130
return Table(self._raw_schema.table(name))
124131

125-
def register_table(self, name, table) -> None:
126-
"""Register a table provider in this schema."""
132+
def register_table(
133+
self, name: str, table: Table | TableProvider | TableProviderExportable
134+
) -> None:
135+
"""Register a table or table provider in this schema.
136+
137+
Objects implementing ``__datafusion_table_provider__`` are also supported
138+
and treated as :class:`TableProvider` instances.
139+
"""
127140
if isinstance(table, Table):
128141
return self._raw_schema.register_table(name, table.table)
129142
return self._raw_schema.register_table(name, table)
@@ -219,14 +232,19 @@ def table(self, name: str) -> Table | None:
219232
"""Retrieve a specific table from this schema."""
220233
...
221234

222-
def register_table(self, name: str, table: Table) -> None: # noqa: B027
223-
"""Add a table from this schema.
235+
def register_table( # noqa: B027
236+
self, name: str, table: Table | TableProvider | TableProviderExportable
237+
) -> None:
238+
"""Add a table to this schema.
224239
225240
This method is optional. If your schema provides a fixed list of tables, you do
226241
not need to implement this method.
242+
243+
Objects implementing ``__datafusion_table_provider__`` are also supported
244+
and treated as :class:`TableProvider` instances.
227245
"""
228246

229-
def deregister_table(self, name, cascade: bool) -> None: # noqa: B027
247+
def deregister_table(self, name: str, cascade: bool) -> None: # noqa: B027
230248
"""Remove a table from this schema.
231249
232250
This method is optional. If your schema provides a fixed list of tables, you do

python/datafusion/context.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import pandas as pd
4747
import polars as pl
4848

49+
from datafusion import TableProvider
4950
from datafusion.plan import ExecutionPlan, LogicalPlan
5051

5152

@@ -734,7 +735,7 @@ def from_polars(self, data: pl.DataFrame, name: str | None = None) -> DataFrame:
734735
# https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
735736
# is the discussion on how we arrived at adding register_view
736737
def register_view(self, name: str, df: DataFrame) -> None:
737-
"""Register a :py:class: `~datafusion.detaframe.DataFrame` as a view.
738+
"""Register a :py:class:`~datafusion.dataframe.DataFrame` as a view.
738739
739740
Args:
740741
name (str): The name to register the view under.
@@ -743,16 +744,31 @@ def register_view(self, name: str, df: DataFrame) -> None:
743744
view = df.into_view()
744745
self.ctx.register_table(name, view)
745746

746-
def register_table(self, name: str, table: Table) -> None:
747-
"""Register a :py:class: `~datafusion.catalog.Table` as a table.
747+
def register_table(
748+
self, name: str, table: Table | TableProvider | TableProviderExportable
749+
) -> None:
750+
"""Register a :py:class:`~datafusion.catalog.Table` or ``TableProvider``.
748751
749-
The registered table can be referenced from SQL statement executed against.
752+
The registered table can be referenced from SQL statements executed against
753+
this context.
754+
755+
Plain :py:class:`~datafusion.dataframe.DataFrame` objects are not supported;
756+
convert them first with :meth:`datafusion.dataframe.DataFrame.into_view` or
757+
:meth:`datafusion.catalog.TableProvider.from_dataframe`.
758+
759+
Objects implementing ``__datafusion_table_provider__`` are also supported
760+
and treated as :class:`~datafusion.catalog.TableProvider` instances.
750761
751762
Args:
752763
name: Name of the resultant table.
753-
table: DataFusion table to add to the session context.
764+
table: DataFusion :class:`Table`, :class:`TableProvider`, or any object
765+
implementing ``__datafusion_table_provider__`` to add to the session
766+
context.
754767
"""
755-
self.ctx.register_table(name, table.table)
768+
if isinstance(table, Table):
769+
self.ctx.register_table(name, table.table)
770+
else:
771+
self.ctx.register_table(name, table)
756772

757773
def deregister_table(self, name: str) -> None:
758774
"""Remove a table from the session."""
@@ -772,14 +788,21 @@ def register_catalog_provider(
772788
self.ctx.register_catalog_provider(name, provider)
773789

774790
def register_table_provider(
775-
self, name: str, provider: TableProviderExportable
791+
self, name: str, provider: Table | TableProvider | TableProviderExportable
776792
) -> None:
777793
"""Register a table provider.
778794
779-
This table provider must have a method called ``__datafusion_table_provider__``
780-
which returns a PyCapsule that exposes a ``FFI_TableProvider``.
795+
Deprecated: use :meth:`register_table` instead.
796+
797+
Objects implementing ``__datafusion_table_provider__`` are also supported
798+
and treated as :class:`~datafusion.catalog.TableProvider` instances.
781799
"""
782-
self.ctx.register_table_provider(name, provider)
800+
warnings.warn(
801+
"register_table_provider is deprecated; use register_table",
802+
DeprecationWarning,
803+
stacklevel=2,
804+
)
805+
self.register_table(name, provider)
783806

784807
def register_udtf(self, func: TableFunction) -> None:
785808
"""Register a user defined table function."""

python/datafusion/dataframe.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import pyarrow as pa
5454

5555
from datafusion._internal import expr as expr_internal
56+
from datafusion.table_provider import TableProvider
5657

5758
from enum import Enum
5859

@@ -307,9 +308,17 @@ def __init__(self, df: DataFrameInternal) -> None:
307308
"""
308309
self.df = df
309310

310-
def into_view(self) -> pa.Table:
311-
"""Convert DataFrame as a ViewTable which can be used in register_table."""
312-
return self.df.into_view()
311+
def into_view(self) -> TableProvider:
312+
"""Convert ``DataFrame`` into a ``TableProvider`` view for registration.
313+
314+
This is the preferred way to obtain a view for
315+
:py:meth:`~datafusion.context.SessionContext.register_table`.
316+
``TableProvider.from_dataframe`` calls this method under the hood,
317+
and the older ``TableProvider.from_view`` helper is deprecated.
318+
"""
319+
from datafusion.table_provider import TableProvider as _TableProvider
320+
321+
return _TableProvider(self.df.into_view())
313322

314323
def __getitem__(self, key: str | list[str]) -> DataFrame:
315324
"""Return a new :py:class`DataFrame` with the specified column or columns.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Wrapper helpers for :mod:`datafusion._internal.TableProvider`."""
18+
19+
from __future__ import annotations
20+
21+
from typing import Any
22+
import warnings
23+
24+
import datafusion._internal as df_internal
25+
26+
_InternalTableProvider = df_internal.TableProvider
27+
28+
29+
class TableProvider:
30+
"""High level wrapper around :mod:`datafusion._internal.TableProvider`."""
31+
32+
__slots__ = ("_table_provider",)
33+
34+
def __init__(self, table_provider: _InternalTableProvider) -> None:
35+
"""Wrap a low level :class:`~datafusion._internal.TableProvider`."""
36+
if isinstance(table_provider, TableProvider):
37+
table_provider = table_provider._table_provider
38+
39+
if not isinstance(table_provider, _InternalTableProvider):
40+
msg = "Expected a datafusion._internal.TableProvider instance."
41+
raise TypeError(msg)
42+
43+
self._table_provider = table_provider
44+
45+
# ------------------------------------------------------------------
46+
# constructors
47+
# ------------------------------------------------------------------
48+
@classmethod
49+
def _wrap(cls, provider: _InternalTableProvider | TableProvider) -> TableProvider:
50+
if isinstance(provider, cls):
51+
return provider
52+
return cls(provider)
53+
54+
@classmethod
55+
def from_capsule(cls, capsule: Any) -> TableProvider:
56+
"""Create a :class:`TableProvider` from a PyCapsule."""
57+
provider = _InternalTableProvider.from_capsule(capsule)
58+
return cls(provider)
59+
60+
@classmethod
61+
def from_dataframe(cls, df: Any) -> TableProvider:
62+
"""Create a :class:`TableProvider` from a :class:`DataFrame`."""
63+
from datafusion.dataframe import DataFrame as DataFrameWrapper
64+
65+
if isinstance(df, DataFrameWrapper):
66+
df = df.df
67+
68+
provider = _InternalTableProvider.from_dataframe(df)
69+
return cls(provider)
70+
71+
@classmethod
72+
def from_view(cls, df: Any) -> TableProvider:
73+
"""Deprecated. Use :meth:`DataFrame.into_view` or :meth:`TableProvider.from_dataframe`."""
74+
from datafusion.dataframe import DataFrame as DataFrameWrapper
75+
76+
if isinstance(df, DataFrameWrapper):
77+
df = df.df
78+
79+
provider = _InternalTableProvider.from_view(df)
80+
warnings.warn(
81+
"TableProvider.from_view is deprecated; use DataFrame.into_view or "
82+
"TableProvider.from_dataframe instead.",
83+
DeprecationWarning,
84+
stacklevel=2,
85+
)
86+
return cls(provider)
87+
88+
# ------------------------------------------------------------------
89+
# passthrough helpers
90+
# ------------------------------------------------------------------
91+
def __getattr__(self, name: str) -> Any:
92+
"""Delegate attribute lookup to the wrapped provider."""
93+
return getattr(self._table_provider, name)
94+
95+
def __repr__(self) -> str: # pragma: no cover - simple delegation
96+
"""Return a representation of the wrapped provider."""
97+
return repr(self._table_provider)
98+
99+
def __datafusion_table_provider__(self) -> Any:
100+
"""Expose the wrapped provider for FFI integrations."""
101+
return self._table_provider.__datafusion_table_provider__()
102+
103+
104+
__all__ = ["TableProvider"]

0 commit comments

Comments
 (0)