Skip to content

Commit 569e554

Browse files
committed
Revert "UNPICK changes to review"
This reverts commit 9e74c83.
1 parent 9e74c83 commit 569e554

File tree

14 files changed

+1039
-151
lines changed

14 files changed

+1039
-151
lines changed

docs/source/conf.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@
7272
suppress_warnings = ["autoapi.python_import_resolution"]
7373
autoapi_python_class_content = "both"
7474
autoapi_keep_files = False # set to True for debugging generated files
75+
autoapi_options = [
76+
"members",
77+
"undoc-members",
78+
"special-members",
79+
"show-inheritance",
80+
"show-module-summary",
81+
"imported-members",
82+
]
7583

7684

7785
def autoapi_skip_member_fn(app, what, name, obj, skip, options) -> bool: # noqa: ARG001

docs/source/user-guide/dataframe/index.rst

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,44 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152+
PyArrow Streaming
153+
-----------------
154+
155+
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156+
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157+
Earlier versions eagerly converted the entire DataFrame when exporting to
158+
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159+
are produced lazily so you can process arbitrarily large results without
160+
out-of-memory errors.
161+
162+
.. code-block:: python
163+
164+
import pyarrow as pa
165+
166+
# Create a PyArrow RecordBatchReader without materializing all batches
167+
reader = pa.RecordBatchReader.from_stream(df)
168+
for batch in reader:
169+
... # process each batch as it is produced
170+
171+
Note that streams retain the originating ``SessionContext`` internally, so the
172+
context can be safely dropped once the stream has been obtained.
173+
174+
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` objects
175+
that implement the Arrow C data interface. These batches can be consumed by
176+
libraries like PyArrow without copying:
177+
178+
.. code-block:: python
179+
180+
for batch in df:
181+
pa_batch = batch.to_pyarrow() # optional conversion
182+
... # process each batch as it is produced
183+
184+
See :doc:`../io/arrow` for additional details on the Arrow interface.
185+
152186
HTML Rendering
153187
--------------
154188

python/datafusion/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
)
5454
from .io import read_avro, read_csv, read_json, read_parquet
5555
from .plan import ExecutionPlan, LogicalPlan
56-
from .record_batch import RecordBatch, RecordBatchStream
56+
from .record_batch import RecordBatch, RecordBatchStream, to_record_batch_stream
5757
from .user_defined import (
5858
Accumulator,
5959
AggregateUDF,
@@ -107,6 +107,7 @@
107107
"read_json",
108108
"read_parquet",
109109
"substrait",
110+
"to_record_batch_stream",
110111
"udaf",
111112
"udf",
112113
"udtf",

python/datafusion/dataframe.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
from typing import (
2626
TYPE_CHECKING,
2727
Any,
28+
AsyncIterator,
2829
Iterable,
30+
Iterator,
2931
Literal,
3032
Optional,
3133
Union,
@@ -42,7 +44,11 @@
4244
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4345
from datafusion.expr import Expr, SortExpr, sort_or_default
4446
from datafusion.plan import ExecutionPlan, LogicalPlan
45-
from datafusion.record_batch import RecordBatchStream
47+
from datafusion.record_batch import (
48+
RecordBatch,
49+
RecordBatchStream,
50+
to_record_batch_stream,
51+
)
4652

4753
if TYPE_CHECKING:
4854
import pathlib
@@ -53,6 +59,7 @@
5359
import pyarrow as pa
5460

5561
from datafusion._internal import expr as expr_internal
62+
from datafusion.record_batch import RecordBatch
5663

5764
from enum import Enum
5865

@@ -289,6 +296,9 @@ def __init__(
289296
class DataFrame:
290297
"""Two dimensional table representation of data.
291298
299+
DataFrame objects are iterable; iterating over a DataFrame yields
300+
:class:`pyarrow.RecordBatch` instances lazily.
301+
292302
See :ref:`user_guide_concepts` in the online documentation for more information.
293303
"""
294304

@@ -1098,21 +1108,47 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
10981108
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
10991109

11001110
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1101-
"""Export an Arrow PyCapsule Stream.
1111+
"""Export the DataFrame as an Arrow C Stream.
1112+
1113+
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1114+
Arrow's C Stream interface. Record batches are produced incrementally, so the
1115+
full result set is never materialized in memory. When ``requested_schema`` is
1116+
provided, only straightforward projections such as column selection or
1117+
reordering are applied.
11021118
1103-
This will execute and collect the DataFrame. We will attempt to respect the
1104-
requested schema, but only trivial transformations will be applied such as only
1105-
returning the fields listed in the requested schema if their data types match
1106-
those in the DataFrame.
1119+
The returned capsule holds a reference to the originating
1120+
:class:`SessionContext`, keeping it alive until the stream is fully
1121+
consumed. This makes it safe to drop the original context after obtaining
1122+
the stream.
11071123
11081124
Args:
11091125
requested_schema: Attempt to provide the DataFrame using this schema.
11101126
11111127
Returns:
1112-
Arrow PyCapsule object.
1128+
Arrow PyCapsule object representing an ``ArrowArrayStream``.
11131129
"""
1130+
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1131+
# ``execute_stream_partitioned`` under the hood to stream batches while
1132+
# preserving the original partition order.
11141133
return self.df.__arrow_c_stream__(requested_schema)
11151134

1135+
def __iter__(self) -> Iterator[pa.RecordBatch]:
1136+
"""Iterate over :class:`pyarrow.RecordBatch` objects.
1137+
1138+
Results are streamed without materializing the full DataFrame. This
1139+
implementation delegates to :func:`to_record_batch_stream`, which executes
1140+
the :class:`DataFrame` and returns a :class:`RecordBatchStream`.
1141+
"""
1142+
return to_record_batch_stream(self).__iter__()
1143+
1144+
def __aiter__(self) -> AsyncIterator[RecordBatch]:
1145+
"""Asynchronously yield record batches from the DataFrame.
1146+
1147+
This delegates to :func:`to_record_batch_stream` to obtain a
1148+
:class:`RecordBatchStream` and returns its asynchronous iterator.
1149+
"""
1150+
return to_record_batch_stream(self).__aiter__()
1151+
11161152
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11171153
"""Apply a function to the current DataFrame which returns another DataFrame.
11181154

python/datafusion/record_batch.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525

2626
from typing import TYPE_CHECKING
2727

28+
import datafusion._internal as df_internal
29+
2830
if TYPE_CHECKING:
2931
import pyarrow as pa
3032
import typing_extensions
3133

32-
import datafusion._internal as df_internal
34+
from datafusion.dataframe import DataFrame
3335

3436

3537
class RecordBatch:
@@ -58,19 +60,19 @@ def __init__(self, record_batch_stream: df_internal.RecordBatchStream) -> None:
5860
"""This constructor is typically not called by the end user."""
5961
self.rbs = record_batch_stream
6062

61-
def next(self) -> RecordBatch:
62-
"""See :py:func:`__next__` for the iterator function."""
63+
def next(self) -> pa.RecordBatch:
64+
"""Retrieve the next :py:class:`pa.RecordBatch`."""
6365
return next(self)
6466

65-
async def __anext__(self) -> RecordBatch:
66-
"""Async iterator function."""
67+
async def __anext__(self) -> pa.RecordBatch:
68+
"""Async iterator returning :py:class:`pa.RecordBatch`."""
6769
next_batch = await self.rbs.__anext__()
68-
return RecordBatch(next_batch)
70+
return next_batch.to_pyarrow()
6971

70-
def __next__(self) -> RecordBatch:
71-
"""Iterator function."""
72+
def __next__(self) -> pa.RecordBatch:
73+
"""Iterator returning :py:class:`pa.RecordBatch`."""
7274
next_batch = next(self.rbs)
73-
return RecordBatch(next_batch)
75+
return next_batch.to_pyarrow()
7476

7577
def __aiter__(self) -> typing_extensions.Self:
7678
"""Async iterator function."""
@@ -79,3 +81,15 @@ def __aiter__(self) -> typing_extensions.Self:
7981
def __iter__(self) -> typing_extensions.Self:
8082
"""Iterator function."""
8183
return self
84+
85+
86+
def to_record_batch_stream(df: DataFrame) -> RecordBatchStream:
87+
"""Convert a DataFrame into a RecordBatchStream.
88+
89+
Args:
90+
df: DataFrame to convert.
91+
92+
Returns:
93+
A RecordBatchStream representing the DataFrame.
94+
"""
95+
return df.execute_stream()

python/tests/conftest.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import pyarrow as pa
1919
import pytest
20-
from datafusion import SessionContext
20+
from datafusion import DataFrame, SessionContext
2121
from pyarrow.csv import write_csv
2222

2323

@@ -49,3 +49,12 @@ def database(ctx, tmp_path):
4949
delimiter=",",
5050
schema_infer_max_records=10,
5151
)
52+
53+
54+
@pytest.fixture
55+
def fail_collect(monkeypatch):
56+
def _fail_collect(self, *args, **kwargs): # pragma: no cover - failure path
57+
msg = "collect should not be called"
58+
raise AssertionError(msg)
59+
60+
monkeypatch.setattr(DataFrame, "collect", _fail_collect)

0 commit comments

Comments
 (0)