Skip to content

Commit 51c0f3e

Browse files
committed
Add Rust helper for minimal in-memory table provider
This commit introduces a new Rust-side helper function, `make_table_provider_capsule`, which constructs a minimal in-memory table provider capsule and registers it with the internal catalog module. The helper is now exposed via `datafusion.catalog.make_table_provider_capsule()`. Additionally, the user guide has been updated to emphasize the destructor requirement for externally supplied capsules. A revised PyCapsule regression example has been included, along with a regression test to ensure the new helper correctly interacts with `SessionContext.read_table`.
1 parent cefeb8d commit 51c0f3e

File tree

5 files changed

+81
-27
lines changed

5 files changed

+81
-27
lines changed

docs/source/user-guide/io/table_provider.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ via `PyCapsule <https://pyo3.rs/main/doc/pyo3/types/struct.pycapsule>`_.
2828

2929
A complete example can be found in the `examples folder <https://github.com/apache/datafusion-python/tree/main/examples>`_.
3030

31+
.. note::
32+
33+
DataFusion validates that table-provider capsules were created through the
34+
``datafusion_ffi`` helpers so that the release callback is installed. Use a
35+
helper such as :func:`datafusion.catalog.make_table_provider_capsule` (for an
36+
empty in-memory provider) or the constructors provided by :mod:`datafusion_ffi`
37+
when fabricating capsules for tests and examples instead of calling
38+
``PyCapsule_New`` directly.
39+
3140
.. code-block:: rust
3241
3342
#[pymethods]

examples/pycapsule_failure.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,53 @@
1+
"""Demonstrate the destructor requirement for table-provider PyCapsules.
2+
3+
DataFusion rejects table-provider capsules that are fabricated directly via
4+
``PyCapsule_New`` because they lack the release hook installed by the
5+
``datafusion_ffi`` helpers. Use :func:`datafusion.catalog.make_table_provider_capsule`
6+
or the Rust-side constructors exposed by :mod:`datafusion_ffi` to obtain a valid
7+
capsule instead of constructing one manually.
8+
"""
9+
110
from __future__ import annotations
211

312
import ctypes
413

514
from datafusion import SessionContext, Table
15+
from datafusion.catalog import make_table_provider_capsule
16+
17+
_PYCAPSULE_NEW = ctypes.pythonapi.PyCapsule_New
18+
_PYCAPSULE_NEW.restype = ctypes.py_object
19+
_PYCAPSULE_NEW.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
620

721

822
# Keep the backing memory alive for the lifetime of the module so the capsule
923
# always wraps a valid (non-null) pointer. The capsule content is irrelevant for
1024
# this regression example—we only need a non-null address.
1125
_DUMMY_CAPSULE_BYTES = ctypes.create_string_buffer(b"x")
1226

13-
class CapsuleContainer:
14-
def __init__(self):
15-
self.__datafusion_table_provider__ = make_table_provider_capsule
1627

17-
def make_table_provider_capsule() -> object:
18-
"""Create a dummy PyCapsule with the expected table provider name."""
28+
def make_invalid_capsule() -> object:
29+
"""Create an intentionally invalid PyCapsule without a destructor."""
1930

20-
pycapsule_new = ctypes.pythonapi.PyCapsule_New
21-
pycapsule_new.restype = ctypes.py_object
22-
pycapsule_new.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
2331
dummy_ptr = ctypes.cast(_DUMMY_CAPSULE_BYTES, ctypes.c_void_p)
24-
return pycapsule_new(dummy_ptr, b"datafusion_table_provider", None)
32+
return _PYCAPSULE_NEW(dummy_ptr, b"datafusion_table_provider", None)
2533

2634

2735
def main() -> None:
28-
"""Attempt to use the capsule the same way existing callers do."""
36+
"""Showcase both the failure and the supported code paths."""
2937

3038
ctx = SessionContext()
39+
3140
try:
32-
capsule = CapsuleContainer()
33-
except Exception as err:
34-
print("Creating the PyCapsule failed:", err)
35-
return
41+
ctx.read_table(make_invalid_capsule())
42+
except ValueError as err:
43+
print("Creating the PyCapsule failed, as expected:", err)
44+
45+
valid_capsule = make_table_provider_capsule()
46+
ctx.read_table(valid_capsule)
3647

48+
valid_capsule_for_table = make_table_provider_capsule()
49+
Table.from_table_provider_capsule(valid_capsule_for_table)
3750

38-
ctx.read_table(capsule)
39-
4051

4152
if __name__ == "__main__":
42-
main()
53+
main()

python/datafusion/catalog.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"Schema",
4343
"SchemaProvider",
4444
"Table",
45+
"make_table_provider_capsule",
4546
]
4647

4748

@@ -182,9 +183,7 @@ def from_dataset(dataset: pa.dataset.Dataset) -> Table:
182183
def from_table_provider_capsule(capsule: object) -> Table:
183184
"""Wrap a validated table provider :class:`PyCapsule` as a :class:`Table`."""
184185

185-
return Table(
186-
df_internal.catalog.RawTable.from_table_provider_capsule(capsule)
187-
)
186+
return Table(df_internal.catalog.RawTable.from_table_provider_capsule(capsule))
188187

189188
@property
190189
def schema(self) -> pa.Schema:
@@ -197,6 +196,18 @@ def kind(self) -> str:
197196
return self._inner.kind
198197

199198

199+
def make_table_provider_capsule() -> object:
200+
"""Return a minimal table-provider capsule with a valid release hook.
201+
202+
DataFusion validates that table-provider capsules originate from
203+
:mod:`datafusion_ffi` helpers so the release callback is registered. This
204+
helper exposes that functionality to Python callers by fabricating an empty
205+
in-memory table provider using :mod:`datafusion_ffi` under the hood.
206+
"""
207+
208+
return df_internal.catalog.make_table_provider_capsule()
209+
210+
200211
class CatalogProvider(ABC):
201212
"""Abstract class for defining a Python based Catalog Provider."""
202213

python/tests/test_context.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
column,
3333
literal,
3434
)
35+
from datafusion.catalog import make_table_provider_capsule
3536

3637
_PYCAPSULE_NEW = ctypes.pythonapi.PyCapsule_New
3738
_PYCAPSULE_NEW.restype = ctypes.py_object
@@ -372,23 +373,29 @@ def __datafusion_table_provider__(self) -> object:
372373
ctx.read_table(container)
373374

374375
with pytest.raises(ValueError, match="missing a destructor"):
375-
Table.from_table_provider_capsule(
376-
container.__datafusion_table_provider__()
377-
)
376+
Table.from_table_provider_capsule(container.__datafusion_table_provider__())
378377

379378

380379
def test_read_table_with_raw_table_provider_capsule(ctx):
381380
df_internal = pytest.importorskip("datafusion._internal")
382-
assert hasattr(
383-
df_internal.catalog.RawTable, "from_table_provider_capsule"
384-
)
381+
assert hasattr(df_internal.catalog.RawTable, "from_table_provider_capsule")
385382

386383
capsule, _backing = _make_invalid_table_provider_capsule()
387384

388385
with pytest.raises(ValueError, match="missing a destructor"):
389386
ctx.read_table(capsule)
390387

391388

389+
def test_read_table_accepts_valid_table_provider_capsule(ctx):
390+
capsule = make_table_provider_capsule()
391+
result = ctx.read_table(capsule).collect()
392+
assert result == []
393+
394+
table_capsule = make_table_provider_capsule()
395+
table = Table.from_table_provider_capsule(table_capsule)
396+
assert table.kind in {"physical", "view", "temporary"}
397+
398+
392399
def test_deregister_table(ctx, database):
393400
default = ctx.catalog()
394401
public = default.schema("public")

src/catalog.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,24 @@ use crate::dataset::Dataset;
1919
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, PyDataFusionResult};
2020
use crate::table::PyTable;
2121
use crate::utils::{validate_pycapsule, wait_for_future};
22+
use arrow_array::RecordBatch;
23+
use arrow_schema::Schema;
2224
use async_trait::async_trait;
2325
use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
2426
use datafusion::common::DataFusionError;
2527
use datafusion::{
2628
catalog::{CatalogProvider, SchemaProvider},
27-
datasource::TableProvider,
29+
datasource::{MemTable, TableProvider},
2830
};
2931
use datafusion_ffi::schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider};
32+
use datafusion_ffi::table_provider::FFI_TableProvider;
3033
use pyo3::exceptions::PyKeyError;
3134
use pyo3::prelude::*;
3235
use pyo3::types::PyCapsule;
3336
use pyo3::IntoPyObjectExt;
3437
use std::any::Any;
3538
use std::collections::HashSet;
39+
use std::ffi::CString;
3640
use std::sync::Arc;
3741

3842
#[pyclass(name = "RawCatalog", module = "datafusion.catalog", subclass)]
@@ -452,10 +456,22 @@ impl CatalogProvider for RustWrappedPyCatalogProvider {
452456
}
453457
}
454458

459+
#[pyfunction]
460+
pub fn make_table_provider_capsule(py: Python<'_>) -> PyResult<Bound<'_, PyCapsule>> {
461+
let schema = Arc::new(Schema::new(vec![]));
462+
let batch = RecordBatch::new_empty(Arc::clone(&schema));
463+
let provider = MemTable::try_new(schema, vec![vec![batch]]).map_err(py_datafusion_err)?;
464+
let provider = FFI_TableProvider::new(Arc::new(provider), false, None);
465+
let name = CString::new("datafusion_table_provider").expect("static capsule name");
466+
467+
PyCapsule::new(py, provider, Some(name))
468+
}
469+
455470
pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
456471
m.add_class::<PyCatalog>()?;
457472
m.add_class::<PySchema>()?;
458473
m.add_class::<PyTable>()?;
474+
m.add_function(pyo3::wrap_pyfunction!(make_table_provider_capsule, m)?)?;
459475

460476
Ok(())
461477
}

0 commit comments

Comments
 (0)