|
25 | 25 | from typing import ( |
26 | 26 | TYPE_CHECKING, |
27 | 27 | Any, |
| 28 | + AsyncIterator, |
28 | 29 | Iterable, |
| 30 | + Iterator, |
29 | 31 | Literal, |
30 | 32 | Optional, |
31 | 33 | Union, |
|
42 | 44 | from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal |
43 | 45 | from datafusion.expr import Expr, SortExpr, sort_or_default |
44 | 46 | from datafusion.plan import ExecutionPlan, LogicalPlan |
45 | | -from datafusion.record_batch import RecordBatchStream |
| 47 | +from datafusion.record_batch import ( |
| 48 | + RecordBatch, |
| 49 | + RecordBatchStream, |
| 50 | + to_record_batch_stream, |
| 51 | +) |
46 | 52 |
|
47 | 53 | if TYPE_CHECKING: |
48 | 54 | import pathlib |
|
53 | 59 | import pyarrow as pa |
54 | 60 |
|
55 | 61 | from datafusion._internal import expr as expr_internal |
| 62 | + from datafusion.record_batch import RecordBatch |
56 | 63 |
|
57 | 64 | from enum import Enum |
58 | 65 |
|
@@ -289,6 +296,9 @@ def __init__( |
289 | 296 | class DataFrame: |
290 | 297 | """Two dimensional table representation of data. |
291 | 298 |
|
| 299 | + DataFrame objects are iterable; iterating over a DataFrame yields |
| 300 | + :class:`pyarrow.RecordBatch` instances lazily. |
| 301 | +
|
292 | 302 | See :ref:`user_guide_concepts` in the online documentation for more information. |
293 | 303 | """ |
294 | 304 |
|
@@ -1018,6 +1028,22 @@ def to_arrow_table(self) -> pa.Table: |
1018 | 1028 | """ |
1019 | 1029 | return self.df.to_arrow_table() |
1020 | 1030 |
|
| 1031 | + def __iter__(self) -> Iterator[pa.RecordBatch]: |
| 1032 | + """Iterate over :py:class:`pyarrow.RecordBatch` objects. |
| 1033 | +
|
| 1034 | + This executes the DataFrame and yields each partition as a native |
| 1035 | + :py:class:`pyarrow.RecordBatch`. |
| 1036 | +
|
| 1037 | + Yields: |
| 1038 | + pyarrow.RecordBatch: the next batch in the result stream. |
| 1039 | + """ |
| 1040 | + for batch in self.execute_stream(): |
| 1041 | + # ``execute_stream`` yields batches that may be ``RecordBatch`` |
| 1042 | + # wrappers or ``pyarrow.RecordBatch`` objects directly. Convert |
| 1043 | + # to native PyArrow batches when necessary to provide a consistent |
| 1044 | + # iterator interface. |
| 1045 | + yield batch.to_pyarrow() if hasattr(batch, "to_pyarrow") else batch |
| 1046 | + |
1021 | 1047 | def execute_stream(self) -> RecordBatchStream: |
1022 | 1048 | """Executes this DataFrame and returns a stream over a single partition. |
1023 | 1049 |
|
@@ -1098,21 +1124,41 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram |
1098 | 1124 | return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls)) |
1099 | 1125 |
|
1100 | 1126 | def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: |
1101 | | - """Export an Arrow PyCapsule Stream. |
| 1127 | + """Export the DataFrame as an Arrow C Stream. |
1102 | 1128 |
|
1103 | | - This will execute and collect the DataFrame. We will attempt to respect the |
1104 | | - requested schema, but only trivial transformations will be applied such as only |
1105 | | - returning the fields listed in the requested schema if their data types match |
1106 | | - those in the DataFrame. |
| 1129 | + The DataFrame is executed using DataFusion's streaming APIs and exposed via |
| 1130 | + Arrow's C Stream interface. Record batches are produced incrementally, so the |
| 1131 | + full result set is never materialized in memory. When ``requested_schema`` is |
| 1132 | + provided, only straightforward projections such as column selection or |
| 1133 | + reordering are applied. |
1107 | 1134 |
|
1108 | 1135 | Args: |
1109 | 1136 | requested_schema: Attempt to provide the DataFrame using this schema. |
1110 | 1137 |
|
1111 | 1138 | Returns: |
1112 | | - Arrow PyCapsule object. |
| 1139 | + Arrow PyCapsule object representing an ``ArrowArrayStream``. |
1113 | 1140 | """ |
| 1141 | + # ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages |
| 1142 | + # ``execute_stream_partitioned`` under the hood to stream batches while |
| 1143 | + # preserving the original partition order. |
1114 | 1144 | return self.df.__arrow_c_stream__(requested_schema) |
1115 | 1145 |
|
| 1146 | + def __iter__(self) -> Iterator[RecordBatch]: |
| 1147 | + """Yield record batches from the DataFrame without materializing results. |
| 1148 | +
|
| 1149 | + This implementation delegates to :func:`to_record_batch_stream`, which |
| 1150 | + executes the DataFrame and returns a :class:`RecordBatchStream`. |
| 1151 | + """ |
| 1152 | + return to_record_batch_stream(self).__iter__() |
| 1153 | + |
| 1154 | + def __aiter__(self) -> AsyncIterator[RecordBatch]: |
| 1155 | + """Asynchronously yield record batches from the DataFrame. |
| 1156 | +
|
| 1157 | + This delegates to :func:`to_record_batch_stream` to obtain a |
| 1158 | + :class:`RecordBatchStream` and returns its asynchronous iterator. |
| 1159 | + """ |
| 1160 | + return to_record_batch_stream(self).__aiter__() |
| 1161 | + |
1116 | 1162 | def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame: |
1117 | 1163 | """Apply a function to the current DataFrame which returns another DataFrame. |
1118 | 1164 |
|
|
0 commit comments