Skip to content

Commit 2b4cf60

Browse files
committed
Revert "UNPICK changes to review"
This reverts commit c32f08d.
1 parent c32f08d commit 2b4cf60

File tree

13 files changed

+668
-70
lines changed

13 files changed

+668
-70
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,34 @@ readme = "README.md"
2626
license = "Apache-2.0"
2727
edition = "2021"
2828
rust-version = "1.78"
29-
include = ["/src", "/datafusion", "/LICENSE.txt", "build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
29+
include = [
30+
"/src",
31+
"/datafusion",
32+
"/LICENSE.txt",
33+
"build.rs",
34+
"pyproject.toml",
35+
"Cargo.toml",
36+
"Cargo.lock",
37+
]
3038

3139
[features]
3240
default = ["mimalloc"]
33-
protoc = [ "datafusion-substrait/protoc" ]
41+
protoc = ["datafusion-substrait/protoc"]
3442
substrait = ["dep:datafusion-substrait"]
3543

3644
[dependencies]
37-
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38-
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
39-
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
45+
tokio = { version = "1.45", features = [
46+
"macros",
47+
"rt",
48+
"rt-multi-thread",
49+
"sync",
50+
] }
51+
pyo3 = { version = "0.24", features = [
52+
"extension-module",
53+
"abi3",
54+
"abi3-py39",
55+
] }
56+
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"] }
4057
pyo3-log = "0.12.4"
4158
arrow = { version = "55.1.0", features = ["pyarrow"] }
4259
datafusion = { version = "49.0.2", features = ["avro", "unicode_expressions"] }
@@ -45,15 +62,23 @@ datafusion-proto = { version = "49.0.2" }
4562
datafusion-ffi = { version = "49.0.2" }
4663
prost = "0.13.1" # keep in line with `datafusion-substrait`
4764
uuid = { version = "1.18", features = ["v4"] }
48-
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
65+
mimalloc = { version = "0.1", optional = true, default-features = false, features = [
66+
"local_dynamic_tls",
67+
] }
4968
async-trait = "0.1.89"
5069
futures = "0.3"
51-
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
70+
cstr = "0.2"
71+
object_store = { version = "0.12.3", features = [
72+
"aws",
73+
"gcp",
74+
"azure",
75+
"http",
76+
] }
5277
url = "2"
5378
log = "0.4.27"
5479

5580
[build-dependencies]
56-
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
81+
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
5782
pyo3-build-config = "0.24"
5883

5984
[lib]

docs/source/user-guide/dataframe/index.rst

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,66 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152+
PyArrow Streaming
153+
-----------------
154+
155+
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156+
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157+
Earlier versions eagerly converted the entire DataFrame when exporting to
158+
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159+
are produced lazily so you can process arbitrarily large results without
160+
out-of-memory errors.
161+
162+
.. code-block:: python
163+
164+
import pyarrow as pa
165+
166+
# Create a PyArrow RecordBatchReader without materializing all batches
167+
reader = pa.RecordBatchReader.from_stream(df)
168+
for batch in reader:
169+
... # process each batch as it is produced
170+
171+
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch`
172+
objects lazily so you can loop over results directly without importing
173+
PyArrow:
174+
175+
.. code-block:: python
176+
177+
for batch in df:
178+
... # each batch is a ``datafusion.RecordBatch``
179+
180+
Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow
181+
table without collecting everything eagerly:
182+
183+
.. code-block:: python
184+
185+
import pyarrow as pa
186+
table = pa.Table.from_batches(b.to_pyarrow() for b in df)
187+
188+
Asynchronous iteration is supported as well, allowing integration with
189+
``asyncio`` event loops:
190+
191+
.. code-block:: python
192+
193+
async for batch in df:
194+
... # process each batch as it is produced
195+
196+
To work with the stream directly, use
197+
``to_record_batch_stream()``, which returns a
198+
:class:`~datafusion.RecordBatchStream`:
199+
200+
.. code-block:: python
201+
202+
stream = df.to_record_batch_stream()
203+
for batch in stream:
204+
...
205+
206+
See :doc:`../io/arrow` for additional details on the Arrow interface.
207+
152208
HTML Rendering
153209
--------------
154210

python/datafusion/dataframe.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
from typing import (
2626
TYPE_CHECKING,
2727
Any,
28+
AsyncIterator,
2829
Iterable,
30+
Iterator,
2931
Literal,
3032
Optional,
3133
Union,
@@ -42,7 +44,7 @@
4244
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4345
from datafusion.expr import Expr, SortExpr, sort_or_default
4446
from datafusion.plan import ExecutionPlan, LogicalPlan
45-
from datafusion.record_batch import RecordBatchStream
47+
from datafusion.record_batch import RecordBatch, RecordBatchStream
4648

4749
if TYPE_CHECKING:
4850
import pathlib
@@ -289,6 +291,9 @@ def __init__(
289291
class DataFrame:
290292
"""Two dimensional table representation of data.
291293
294+
DataFrame objects are iterable; iterating over a DataFrame yields
295+
:class:`datafusion.RecordBatch` instances lazily.
296+
292297
See :ref:`user_guide_concepts` in the online documentation for more information.
293298
"""
294299

@@ -305,7 +310,7 @@ def into_view(self) -> pa.Table:
305310
return self.df.into_view()
306311

307312
def __getitem__(self, key: str | list[str]) -> DataFrame:
308-
"""Return a new :py:class`DataFrame` with the specified column or columns.
313+
"""Return a new :py:class:`DataFrame` with the specified column or columns.
309314
310315
Args:
311316
key: Column name or list of column names to select.
@@ -1035,6 +1040,15 @@ def execute_stream_partitioned(self) -> list[RecordBatchStream]:
10351040
streams = self.df.execute_stream_partitioned()
10361041
return [RecordBatchStream(rbs) for rbs in streams]
10371042

1043+
def to_record_batch_stream(self) -> RecordBatchStream: # noqa: F811
1044+
"""Return a :py:class:`RecordBatchStream` over this DataFrame's results.
1045+
1046+
Returns:
1047+
A ``RecordBatchStream`` representing the lazily generated record
1048+
batches for this DataFrame.
1049+
"""
1050+
return self.execute_stream()
1051+
10381052
def to_pandas(self) -> pd.DataFrame:
10391053
"""Execute the :py:class:`DataFrame` and convert it into a Pandas DataFrame.
10401054
@@ -1098,21 +1112,33 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
10981112
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
10991113

11001114
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1101-
"""Export an Arrow PyCapsule Stream.
1115+
"""Export the DataFrame as an Arrow C Stream.
11021116
1103-
This will execute and collect the DataFrame. We will attempt to respect the
1104-
requested schema, but only trivial transformations will be applied such as only
1105-
returning the fields listed in the requested schema if their data types match
1106-
those in the DataFrame.
1117+
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1118+
Arrow's C Stream interface. Record batches are produced incrementally, so the
1119+
full result set is never materialized in memory. When ``requested_schema`` is
1120+
provided, only straightforward projections such as column selection or
1121+
reordering are applied.
11071122
11081123
Args:
11091124
requested_schema: Attempt to provide the DataFrame using this schema.
11101125
11111126
Returns:
1112-
Arrow PyCapsule object.
1127+
Arrow PyCapsule object representing an ``ArrowArrayStream``.
11131128
"""
1129+
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1130+
# ``execute_stream_partitioned`` under the hood to stream batches while
1131+
# preserving the original partition order.
11141132
return self.df.__arrow_c_stream__(requested_schema)
11151133

1134+
def __iter__(self) -> Iterator[RecordBatch]:
1135+
"""Return an iterator over this DataFrame's record batches."""
1136+
return iter(self.to_record_batch_stream())
1137+
1138+
def __aiter__(self) -> AsyncIterator[RecordBatch]:
1139+
"""Return an async iterator over this DataFrame's record batches."""
1140+
return self.to_record_batch_stream().__aiter__()
1141+
11161142
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11171143
"""Apply a function to the current DataFrame which returns another DataFrame.
11181144

python/datafusion/record_batch.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,26 @@ def to_pyarrow(self) -> pa.RecordBatch:
4646
"""Convert to :py:class:`pa.RecordBatch`."""
4747
return self.record_batch.to_pyarrow()
4848

49+
def __arrow_c_array__(
50+
self, requested_schema: object | None = None
51+
) -> tuple[object, object]:
52+
"""Export the record batch via the Arrow C Data Interface.
53+
54+
This allows zero-copy interchange with libraries that support the
55+
`Arrow PyCapsule interface <https://arrow.apache.org/docs/format/
56+
CDataInterface/PyCapsuleInterface.html>`_.
57+
58+
Args:
59+
requested_schema: Attempt to provide the record batch using this
60+
schema. Only straightforward projections such as column
61+
selection or reordering are applied.
62+
63+
Returns:
64+
Two Arrow PyCapsule objects representing the ``ArrowArray`` and
65+
``ArrowSchema``.
66+
"""
67+
return self.record_batch.__arrow_c_array__(requested_schema)
68+
4969

5070
class RecordBatchStream:
5171
"""This class represents a stream of record batches.
@@ -63,19 +83,19 @@ def next(self) -> RecordBatch:
6383
return next(self)
6484

6585
async def __anext__(self) -> RecordBatch:
66-
"""Async iterator function."""
86+
"""Return the next :py:class:`RecordBatch` in the stream asynchronously."""
6787
next_batch = await self.rbs.__anext__()
6888
return RecordBatch(next_batch)
6989

7090
def __next__(self) -> RecordBatch:
71-
"""Iterator function."""
91+
"""Return the next :py:class:`RecordBatch` in the stream."""
7292
next_batch = next(self.rbs)
7393
return RecordBatch(next_batch)
7494

7595
def __aiter__(self) -> typing_extensions.Self:
76-
"""Async iterator function."""
96+
"""Return an asynchronous iterator over record batches."""
7797
return self
7898

7999
def __iter__(self) -> typing_extensions.Self:
80-
"""Iterator function."""
100+
"""Return an iterator over record batches."""
81101
return self

python/tests/conftest.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import pyarrow as pa
1919
import pytest
20-
from datafusion import SessionContext
20+
from datafusion import DataFrame, SessionContext
2121
from pyarrow.csv import write_csv
2222

2323

@@ -49,3 +49,12 @@ def database(ctx, tmp_path):
4949
delimiter=",",
5050
schema_infer_max_records=10,
5151
)
52+
53+
54+
@pytest.fixture
55+
def fail_collect(monkeypatch):
56+
def _fail_collect(self, *args, **kwargs): # pragma: no cover - failure path
57+
msg = "collect should not be called"
58+
raise AssertionError(msg)
59+
60+
monkeypatch.setattr(DataFrame, "collect", _fail_collect)

0 commit comments

Comments
 (0)