Skip to content

Commit 73e1b06

Browse files
committed
Add range method to SessionContext and iterator support to DataFrame for improved data handling
1 parent 129a7ae commit 73e1b06

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

python/datafusion/context.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,37 @@ def from_polars(self, data: pl.DataFrame, name: str | None = None) -> DataFrame:
731731
"""
732732
return DataFrame(self.ctx.from_polars(data, name))
733733

734+
def range(
735+
self,
736+
start: int,
737+
stop: int | None = None,
738+
step: int = 1,
739+
partitions: int | None = None,
740+
) -> DataFrame:
741+
"""Create a DataFrame containing a sequence of numbers.
742+
743+
This is backed by DataFusion's ``range`` table function, which generates
744+
values lazily and therefore does not materialize the full range in
745+
memory. When ``stop`` is omitted, ``start`` is treated as the stop value
746+
and the sequence begins at zero.
747+
748+
Args:
749+
start: Starting value for the sequence or the exclusive stop if
750+
``stop`` is ``None``.
751+
stop: Exclusive upper bound of the sequence.
752+
step: Increment between successive values.
753+
partitions: Optional number of partitions for the generated data.
754+
755+
Returns:
756+
DataFrame yielding the requested range of values.
757+
"""
758+
if stop is None:
759+
start, stop = 0, start
760+
761+
parts = f", {int(partitions)}" if partitions is not None else ""
762+
sql = f"SELECT * FROM range({int(start)}, {int(stop)}, {int(step)}{parts})" # noqa: S608
763+
return self.sql(sql)
764+
734765
# https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
735766
# is the discussion on how we arrived at adding register_view
736767
def register_view(self, name: str, df: DataFrame) -> None:

python/datafusion/dataframe.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
TYPE_CHECKING,
2727
Any,
2828
Iterable,
29+
Iterator,
2930
Literal,
3031
Optional,
3132
Union,
@@ -1116,6 +1117,19 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
11161117
# ``execute_stream`` under the hood to stream batches one at a time.
11171118
return self.df.__arrow_c_stream__(requested_schema)
11181119

1120+
def __iter__(self) -> Iterator[pa.RecordBatch]:
1121+
"""Yield record batches from the DataFrame without materializing results.
1122+
1123+
This implementation streams record batches via the Arrow C Stream
1124+
interface, allowing callers such as :func:`pyarrow.Table.from_batches` to
1125+
consume results lazily. The DataFrame is executed using DataFusion's
1126+
streaming APIs so ``collect`` is never invoked.
1127+
"""
1128+
import pyarrow as pa
1129+
1130+
reader = pa.RecordBatchReader._import_from_c(self.__arrow_c_stream__())
1131+
yield from reader
1132+
11191133
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11201134
"""Apply a function to the current DataFrame which returns another DataFrame.
11211135

0 commit comments

Comments
 (0)