2525from typing import (
2626 TYPE_CHECKING ,
2727 Any ,
28+ AsyncIterator ,
2829 Iterable ,
30+ Iterator ,
2931 Literal ,
3032 Optional ,
3133 Union ,
4244from datafusion ._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4345from datafusion .expr import Expr , SortExpr , sort_or_default
4446from datafusion .plan import ExecutionPlan , LogicalPlan
45- from datafusion .record_batch import RecordBatchStream
47+ from datafusion .record_batch import RecordBatch , RecordBatchStream
4648
4749if TYPE_CHECKING :
4850 import pathlib
@@ -289,6 +291,9 @@ def __init__(
289291class DataFrame :
290292 """Two dimensional table representation of data.
291293
294+ DataFrame objects are iterable; iterating over a DataFrame yields
295+ :class:`pyarrow.RecordBatch` instances lazily.
296+
292297 See :ref:`user_guide_concepts` in the online documentation for more information.
293298 """
294299
@@ -305,7 +310,7 @@ def into_view(self) -> pa.Table:
305310 return self .df .into_view ()
306311
307312 def __getitem__ (self , key : str | list [str ]) -> DataFrame :
308- """Return a new :py:class`DataFrame` with the specified column or columns.
313+ """Return a new :py:class: `DataFrame` with the specified column or columns.
309314
310315 Args:
311316 key: Column name or list of column names to select.
@@ -1026,6 +1031,10 @@ def execute_stream(self) -> RecordBatchStream:
10261031 """
10271032 return RecordBatchStream (self .df .execute_stream ())
10281033
1034+ def to_record_batch_stream (self ) -> RecordBatchStream :
1035+ """Return a :class:`RecordBatchStream` executing this DataFrame."""
1036+ return self .execute_stream ()
1037+
10291038 def execute_stream_partitioned (self ) -> list [RecordBatchStream ]:
10301039 """Executes this DataFrame and returns a stream for each partition.
10311040
@@ -1035,6 +1044,15 @@ def execute_stream_partitioned(self) -> list[RecordBatchStream]:
10351044 streams = self .df .execute_stream_partitioned ()
10361045 return [RecordBatchStream (rbs ) for rbs in streams ]
10371046
1047+ def to_record_batch_stream (self ) -> RecordBatchStream :
1048+ """Return a :py:class:`RecordBatchStream` over this DataFrame's results.
1049+
1050+ Returns:
1051+ A ``RecordBatchStream`` representing the lazily generated record
1052+ batches for this DataFrame.
1053+ """
1054+ return self .execute_stream ()
1055+
10381056 def to_pandas (self ) -> pd .DataFrame :
10391057 """Execute the :py:class:`DataFrame` and convert it into a Pandas DataFrame.
10401058
@@ -1098,21 +1116,33 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
10981116 return DataFrame (self .df .unnest_columns (columns , preserve_nulls = preserve_nulls ))
10991117
11001118 def __arrow_c_stream__ (self , requested_schema : object | None = None ) -> object :
1101- """Export an Arrow PyCapsule Stream.
1119+ """Export the DataFrame as an Arrow C Stream.
11021120
1103- This will execute and collect the DataFrame. We will attempt to respect the
1104- requested schema, but only trivial transformations will be applied such as only
1105- returning the fields listed in the requested schema if their data types match
1106- those in the DataFrame.
1121+ The DataFrame is executed using DataFusion's streaming APIs and exposed via
1122+ Arrow's C Stream interface. Record batches are produced incrementally, so the
1123+ full result set is never materialized in memory. When ``requested_schema`` is
1124+ provided, only straightforward projections such as column selection or
1125+ reordering are applied.
11071126
11081127 Args:
11091128 requested_schema: Attempt to provide the DataFrame using this schema.
11101129
11111130 Returns:
1112- Arrow PyCapsule object.
1131+ Arrow PyCapsule object representing an ``ArrowArrayStream`` .
11131132 """
1133+ # ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1134+ # ``execute_stream_partitioned`` under the hood to stream batches while
1135+ # preserving the original partition order.
11141136 return self .df .__arrow_c_stream__ (requested_schema )
11151137
1138+ def __iter__ (self ) -> Iterator [RecordBatch ]:
1139+ """Return an iterator over this DataFrame's record batches."""
1140+ return iter (self .to_record_batch_stream ())
1141+
1142+ def __aiter__ (self ) -> AsyncIterator [RecordBatch ]:
1143+ """Return an async iterator over this DataFrame's record batches."""
1144+ return self .to_record_batch_stream ().__aiter__ ()
1145+
11161146 def transform (self , func : Callable [..., DataFrame ], * args : Any ) -> DataFrame :
11171147 """Apply a function to the current DataFrame which returns another DataFrame.
11181148
0 commit comments