From 30fc3d5c7c6f8f2588685351b94bb9aa217ee7bb Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 1 Apr 2026 09:44:22 -0400 Subject: [PATCH 1/6] Add missing SessionContext read/register methods for Arrow IPC and batches Add read_arrow, read_empty, register_arrow, and register_batch methods to SessionContext, exposing upstream DataFusion v53 functionality. The write_* methods and read_batch/read_batches are already covered by DataFrame.write_* and SessionContext.from_arrow respectively. Closes #1458. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/context.rs | 63 +++++++++++++++++++++++++++++++- python/datafusion/context.py | 69 ++++++++++++++++++++++++++++++++++++ python/tests/test_context.py | 39 ++++++++++++++++++++ 3 files changed, 170 insertions(+), 1 deletion(-) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 53994d2f5..4c001b55b 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -41,7 +41,7 @@ use datafusion::execution::context::{ }; use datafusion::execution::disk_manager::DiskManagerMode; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool}; -use datafusion::execution::options::ReadOptions; +use datafusion::execution::options::{ArrowReadOptions, ReadOptions}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::prelude::{ @@ -956,6 +956,39 @@ impl PySessionContext { Ok(()) } + #[pyo3(signature = (name, path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))] + pub fn register_arrow( + &self, + name: &str, + path: &str, + schema: Option>, + file_extension: &str, + table_partition_cols: Vec<(String, PyArrowType)>, + py: Python, + ) -> PyDataFusionResult<()> { + let mut options = ArrowReadOptions::default().table_partition_cols( + table_partition_cols + .into_iter() + .map(|(name, ty)| (name, ty.0)) + .collect::>(), + ); + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + + let result = self.ctx.register_arrow(name, path, options); + wait_for_future(py, result)??; + Ok(()) + } + + pub fn register_batch( + &self, + name: &str, + batch: PyArrowType, + ) -> PyDataFusionResult<()> { + self.ctx.register_batch(name, batch.0)?; + Ok(()) + } + // Registers a PyArrow.Dataset pub fn register_dataset( &self, @@ -1184,6 +1217,34 @@ impl PySessionContext { Ok(PyDataFrame::new(df)) } + pub fn read_empty(&self) -> PyDataFusionResult { + let df = self.ctx.read_empty()?; + Ok(PyDataFrame::new(df)) + } + + #[pyo3(signature = (path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))] + pub fn read_arrow( + &self, + path: &str, + schema: Option>, + file_extension: &str, + table_partition_cols: Vec<(String, PyArrowType)>, + py: Python, + ) -> PyDataFusionResult { + let mut options = ArrowReadOptions::default().table_partition_cols( + table_partition_cols + .into_iter() + .map(|(name, ty)| (name, ty.0)) + .collect::>(), + ); + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + + let result = self.ctx.read_arrow(path, options); + let df = wait_for_future(py, result)??; + Ok(PyDataFrame::new(df)) + } + pub fn read_table(&self, table: Bound<'_, PyAny>) -> PyDataFusionResult { let session = self.clone().into_bound_py_any(table.py())?; let table = PyTable::new(table, Some(session))?; diff --git a/python/datafusion/context.py b/python/datafusion/context.py index c8edc816f..c2a06dc82 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -894,6 +894,15 @@ def register_udtf(self, func: TableFunction) -> None: """Register a user defined table function.""" self.ctx.register_udtf(func._udtf) + def register_batch(self, name: str, batch: pa.RecordBatch) -> None: + """Register a single :py:class:`pa.RecordBatch` as a table. + + Args: + name: Name of the resultant table. + batch: Record batch to register as a table. + """ + self.ctx.register_batch(name, batch) + def register_record_batches( self, name: str, partitions: list[list[pa.RecordBatch]] ) -> None: @@ -1092,6 +1101,33 @@ def register_avro( name, str(path), schema, file_extension, table_partition_cols ) + def register_arrow( + self, + name: str, + path: str | pathlib.Path, + schema: pa.Schema | None = None, + file_extension: str = ".arrow", + table_partition_cols: list[tuple[str, str | pa.DataType]] | None = None, + ) -> None: + """Register an Arrow IPC file as a table. + + The registered table can be referenced from SQL statements executed + against this context. + + Args: + name: Name of the table to register. + path: Path to the Arrow IPC file. + schema: The data source schema. + file_extension: File extension to select. + table_partition_cols: Partition columns. + """ + if table_partition_cols is None: + table_partition_cols = [] + table_partition_cols = _convert_table_partition_cols(table_partition_cols) + self.ctx.register_arrow( + name, str(path), schema, file_extension, table_partition_cols + ) + def register_dataset(self, name: str, dataset: pa.dataset.Dataset) -> None: """Register a :py:class:`pa.dataset.Dataset` as a table. @@ -1328,6 +1364,39 @@ def read_avro( self.ctx.read_avro(str(path), schema, file_partition_cols, file_extension) ) + def read_arrow( + self, + path: str | pathlib.Path, + schema: pa.Schema | None = None, + file_extension: str = ".arrow", + file_partition_cols: list[tuple[str, str | pa.DataType]] | None = None, + ) -> DataFrame: + """Create a :py:class:`DataFrame` for reading an Arrow IPC data source. + + Args: + path: Path to the Arrow IPC file. + schema: The data source schema. + file_extension: File extension to select. + file_partition_cols: Partition columns. + + Returns: + DataFrame representation of the read Arrow IPC file. + """ + if file_partition_cols is None: + file_partition_cols = [] + file_partition_cols = _convert_table_partition_cols(file_partition_cols) + return DataFrame( + self.ctx.read_arrow(str(path), schema, file_extension, file_partition_cols) + ) + + def read_empty(self) -> DataFrame: + """Create an empty :py:class:`DataFrame` with no columns or rows. + + Returns: + An empty DataFrame. + """ + return DataFrame(self.ctx.read_empty()) + def read_table( self, table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset ) -> DataFrame: diff --git a/python/tests/test_context.py b/python/tests/test_context.py index 5df6ed20f..a4a82cdf6 100644 --- a/python/tests/test_context.py +++ b/python/tests/test_context.py @@ -668,6 +668,45 @@ def test_read_avro(ctx): assert avro_df is not None +def test_read_arrow(ctx, tmp_path): + # Write an Arrow IPC file, then read it back + table = pa.table({"a": [1, 2, 3], "b": ["x", "y", "z"]}) + arrow_path = tmp_path / "test.arrow" + with pa.ipc.new_file(str(arrow_path), table.schema) as writer: + writer.write_table(table) + + df = ctx.read_arrow(str(arrow_path)) + result = df.collect() + assert result[0].column(0) == pa.array([1, 2, 3]) + assert result[0].column(1) == pa.array(["x", "y", "z"]) + + +def test_read_empty(ctx): + df = ctx.read_empty() + result = df.collect() + assert result[0].num_columns == 0 + + +def test_register_arrow(ctx, tmp_path): + # Write an Arrow IPC file, then register and query it + table = pa.table({"x": [10, 20, 30]}) + arrow_path = tmp_path / "test.arrow" + with pa.ipc.new_file(str(arrow_path), table.schema) as writer: + writer.write_table(table) + + ctx.register_arrow("arrow_tbl", str(arrow_path)) + result = ctx.sql("SELECT * FROM arrow_tbl").collect() + assert result[0].column(0) == pa.array([10, 20, 30]) + + +def test_register_batch(ctx): + batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) + ctx.register_batch("batch_tbl", batch) + result = ctx.sql("SELECT * FROM batch_tbl").collect() + assert result[0].column(0) == pa.array([1, 2, 3]) + assert result[0].column(1) == pa.array([4, 5, 6]) + + def test_create_sql_options(): SQLOptions() From e494bed4587545b8d9ebe02da602a5d34b9c799f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 5 Apr 2026 07:41:52 -0400 Subject: [PATCH 2/6] Remove redundant read_empty Rust binding, make Python read_empty an alias for empty_table Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/context.rs | 5 ----- python/datafusion/context.py | 4 +++- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 4c001b55b..86675123e 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -1217,11 +1217,6 @@ impl PySessionContext { Ok(PyDataFrame::new(df)) } - pub fn read_empty(&self) -> PyDataFusionResult { - let df = self.ctx.read_empty()?; - Ok(PyDataFrame::new(df)) - } - #[pyo3(signature = (path, schema=None, file_extension=".arrow", table_partition_cols=vec![]))] pub fn read_arrow( &self, diff --git a/python/datafusion/context.py b/python/datafusion/context.py index c2a06dc82..cb5f89f40 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -1392,10 +1392,12 @@ def read_arrow( def read_empty(self) -> DataFrame: """Create an empty :py:class:`DataFrame` with no columns or rows. + This is an alias for :meth:`empty_table`. + Returns: An empty DataFrame. """ - return DataFrame(self.ctx.read_empty()) + return self.empty_table() def read_table( self, table: Table | TableProviderExportable | DataFrame | pa.dataset.Dataset From 246105dda794f82628a154ed4116d9dc3dc680f3 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 5 Apr 2026 07:42:08 -0400 Subject: [PATCH 3/6] Add pathlib.Path and empty batch tests for Arrow IPC and register_batch Co-Authored-By: Claude Opus 4.6 (1M context) --- python/tests/test_context.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/tests/test_context.py b/python/tests/test_context.py index a4a82cdf6..9db6c0435 100644 --- a/python/tests/test_context.py +++ b/python/tests/test_context.py @@ -680,6 +680,11 @@ def test_read_arrow(ctx, tmp_path): assert result[0].column(0) == pa.array([1, 2, 3]) assert result[0].column(1) == pa.array(["x", "y", "z"]) + # Also verify pathlib.Path works + df = ctx.read_arrow(arrow_path) + result = df.collect() + assert result[0].column(0) == pa.array([1, 2, 3]) + def test_read_empty(ctx): df = ctx.read_empty() @@ -698,6 +703,11 @@ def test_register_arrow(ctx, tmp_path): result = ctx.sql("SELECT * FROM arrow_tbl").collect() assert result[0].column(0) == pa.array([10, 20, 30]) + # Also verify pathlib.Path works + ctx.register_arrow("arrow_tbl_path", arrow_path) + result = ctx.sql("SELECT * FROM arrow_tbl_path").collect() + assert result[0].column(0) == pa.array([10, 20, 30]) + def test_register_batch(ctx): batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) @@ -707,6 +717,13 @@ def test_register_batch(ctx): assert result[0].column(1) == pa.array([4, 5, 6]) +def test_register_batch_empty(ctx): + batch = pa.RecordBatch.from_pydict({"a": pa.array([], type=pa.int64())}) + ctx.register_batch("empty_batch_tbl", batch) + result = ctx.sql("SELECT * FROM empty_batch_tbl").collect() + assert result[0].num_rows == 0 + + def test_create_sql_options(): SQLOptions() From 4a2d7ba373ae694295eed79487e8a3f3b722773a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 5 Apr 2026 07:42:15 -0400 Subject: [PATCH 4/6] Make test_read_empty more robust with length and num_rows checks Co-Authored-By: Claude Opus 4.6 (1M context) --- python/tests/test_context.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/tests/test_context.py b/python/tests/test_context.py index 9db6c0435..0b2ce54a5 100644 --- a/python/tests/test_context.py +++ b/python/tests/test_context.py @@ -689,7 +689,9 @@ def test_read_arrow(ctx, tmp_path): def test_read_empty(ctx): df = ctx.read_empty() result = df.collect() + assert len(result) == 1 assert result[0].num_columns == 0 + assert result[0].num_rows == 0 def test_register_arrow(ctx, tmp_path): From 03092ed4b5aa2f0a94478da7750058fcb5317c68 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 5 Apr 2026 07:48:29 -0400 Subject: [PATCH 5/6] Add examples to docstrings for new register/read methods Co-Authored-By: Claude Opus 4.6 (1M context) --- conftest.py | 3 +++ python/datafusion/context.py | 51 ++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/conftest.py b/conftest.py index 73e90077a..0a56d54ec 100644 --- a/conftest.py +++ b/conftest.py @@ -19,6 +19,7 @@ import datafusion as dfn import numpy as np +import pyarrow as pa import pytest from datafusion import col, lit from datafusion import functions as F @@ -29,6 +30,8 @@ def _doctest_namespace(doctest_namespace: dict) -> None: """Add common imports to the doctest namespace.""" doctest_namespace["dfn"] = dfn doctest_namespace["np"] = np + doctest_namespace["pa"] = pa doctest_namespace["col"] = col doctest_namespace["lit"] = lit doctest_namespace["F"] = F + doctest_namespace["ctx"] = dfn.SessionContext() diff --git a/python/datafusion/context.py b/python/datafusion/context.py index cb5f89f40..c59c4b082 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -900,6 +900,17 @@ def register_batch(self, name: str, batch: pa.RecordBatch) -> None: Args: name: Name of the resultant table. batch: Record batch to register as a table. + + Examples: + >>> batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3]}) + >>> ctx.register_batch("batch_tbl", batch) + >>> ctx.sql("SELECT * FROM batch_tbl").collect()[0].column(0) + + [ + 1, + 2, + 3 + ] """ self.ctx.register_batch(name, batch) @@ -1120,6 +1131,22 @@ def register_arrow( schema: The data source schema. file_extension: File extension to select. table_partition_cols: Partition columns. + + Examples: + >>> import tempfile, os + >>> table = pa.table({"x": [10, 20, 30]}) + >>> with tempfile.TemporaryDirectory() as tmpdir: + ... path = os.path.join(tmpdir, "data.arrow") + ... with pa.ipc.new_file(path, table.schema) as writer: + ... writer.write_table(table) + ... ctx.register_arrow("arrow_tbl", path) + ... ctx.sql("SELECT * FROM arrow_tbl").collect()[0].column(0) + + [ + 10, + 20, + 30 + ] """ if table_partition_cols is None: table_partition_cols = [] @@ -1381,6 +1408,22 @@ def read_arrow( Returns: DataFrame representation of the read Arrow IPC file. + + Examples: + >>> import tempfile, os + >>> table = pa.table({"a": [1, 2, 3]}) + >>> with tempfile.TemporaryDirectory() as tmpdir: + ... path = os.path.join(tmpdir, "data.arrow") + ... with pa.ipc.new_file(path, table.schema) as writer: + ... writer.write_table(table) + ... df = ctx.read_arrow(path) + ... df.collect()[0].column(0) + + [ + 1, + 2, + 3 + ] """ if file_partition_cols is None: file_partition_cols = [] @@ -1396,6 +1439,14 @@ def read_empty(self) -> DataFrame: Returns: An empty DataFrame. + + Examples: + >>> df = ctx.read_empty() + >>> result = df.collect() + >>> len(result) + 1 + >>> result[0].num_columns + 0 """ return self.empty_table() From 0f96ea3b9d4f4d2b05e7ba1373bdc9a0950a87f2 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 5 Apr 2026 10:30:57 -0400 Subject: [PATCH 6/6] Empty table actually returns record batch of length one but there are no columns --- python/tests/test_context.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/tests/test_context.py b/python/tests/test_context.py index 0b2ce54a5..b4b3648ac 100644 --- a/python/tests/test_context.py +++ b/python/tests/test_context.py @@ -691,7 +691,11 @@ def test_read_empty(ctx): result = df.collect() assert len(result) == 1 assert result[0].num_columns == 0 - assert result[0].num_rows == 0 + + df = ctx.empty_table() + result = df.collect() + assert len(result) == 1 + assert result[0].num_columns == 0 def test_register_arrow(ctx, tmp_path):