Skip to content

Commit c32f08d

Browse files
committed
UNPICK changes to review
1 parent d348b98 commit c32f08d

File tree

13 files changed

+70
-668
lines changed

13 files changed

+70
-668
lines changed

Cargo.lock

Lines changed: 0 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,17 @@ readme = "README.md"
2626
license = "Apache-2.0"
2727
edition = "2021"
2828
rust-version = "1.78"
29-
include = [
30-
"/src",
31-
"/datafusion",
32-
"/LICENSE.txt",
33-
"build.rs",
34-
"pyproject.toml",
35-
"Cargo.toml",
36-
"Cargo.lock",
37-
]
29+
include = ["/src", "/datafusion", "/LICENSE.txt", "build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
3830

3931
[features]
4032
default = ["mimalloc"]
41-
protoc = ["datafusion-substrait/protoc"]
33+
protoc = [ "datafusion-substrait/protoc" ]
4234
substrait = ["dep:datafusion-substrait"]
4335

4436
[dependencies]
45-
tokio = { version = "1.45", features = [
46-
"macros",
47-
"rt",
48-
"rt-multi-thread",
49-
"sync",
50-
] }
51-
pyo3 = { version = "0.24", features = [
52-
"extension-module",
53-
"abi3",
54-
"abi3-py39",
55-
] }
56-
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"] }
37+
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38+
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
39+
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
5740
pyo3-log = "0.12.4"
5841
arrow = { version = "55.1.0", features = ["pyarrow"] }
5942
datafusion = { version = "49.0.2", features = ["avro", "unicode_expressions"] }
@@ -62,23 +45,15 @@ datafusion-proto = { version = "49.0.2" }
6245
datafusion-ffi = { version = "49.0.2" }
6346
prost = "0.13.1" # keep in line with `datafusion-substrait`
6447
uuid = { version = "1.18", features = ["v4"] }
65-
mimalloc = { version = "0.1", optional = true, default-features = false, features = [
66-
"local_dynamic_tls",
67-
] }
48+
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
6849
async-trait = "0.1.89"
6950
futures = "0.3"
70-
cstr = "0.2"
71-
object_store = { version = "0.12.3", features = [
72-
"aws",
73-
"gcp",
74-
"azure",
75-
"http",
76-
] }
51+
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
7752
url = "2"
7853
log = "0.4.27"
7954

8055
[build-dependencies]
81-
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
56+
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
8257
pyo3-build-config = "0.24"
8358

8459
[lib]

docs/source/user-guide/dataframe/index.rst

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -145,66 +145,10 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152-
PyArrow Streaming
153-
-----------------
154-
155-
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156-
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157-
Earlier versions eagerly converted the entire DataFrame when exporting to
158-
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159-
are produced lazily so you can process arbitrarily large results without
160-
out-of-memory errors.
161-
162-
.. code-block:: python
163-
164-
import pyarrow as pa
165-
166-
# Create a PyArrow RecordBatchReader without materializing all batches
167-
reader = pa.RecordBatchReader.from_stream(df)
168-
for batch in reader:
169-
... # process each batch as it is produced
170-
171-
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch`
172-
objects lazily so you can loop over results directly without importing
173-
PyArrow:
174-
175-
.. code-block:: python
176-
177-
for batch in df:
178-
... # each batch is a ``datafusion.RecordBatch``
179-
180-
Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow
181-
table without collecting everything eagerly:
182-
183-
.. code-block:: python
184-
185-
import pyarrow as pa
186-
table = pa.Table.from_batches(b.to_pyarrow() for b in df)
187-
188-
Asynchronous iteration is supported as well, allowing integration with
189-
``asyncio`` event loops:
190-
191-
.. code-block:: python
192-
193-
async for batch in df:
194-
... # process each batch as it is produced
195-
196-
To work with the stream directly, use
197-
``to_record_batch_stream()``, which returns a
198-
:class:`~datafusion.RecordBatchStream`:
199-
200-
.. code-block:: python
201-
202-
stream = df.to_record_batch_stream()
203-
for batch in stream:
204-
...
205-
206-
See :doc:`../io/arrow` for additional details on the Arrow interface.
207-
208152
HTML Rendering
209153
--------------
210154

python/datafusion/dataframe.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
from typing import (
2626
TYPE_CHECKING,
2727
Any,
28-
AsyncIterator,
2928
Iterable,
30-
Iterator,
3129
Literal,
3230
Optional,
3331
Union,
@@ -44,7 +42,7 @@
4442
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4543
from datafusion.expr import Expr, SortExpr, sort_or_default
4644
from datafusion.plan import ExecutionPlan, LogicalPlan
47-
from datafusion.record_batch import RecordBatch, RecordBatchStream
45+
from datafusion.record_batch import RecordBatchStream
4846

4947
if TYPE_CHECKING:
5048
import pathlib
@@ -291,9 +289,6 @@ def __init__(
291289
class DataFrame:
292290
"""Two dimensional table representation of data.
293291
294-
DataFrame objects are iterable; iterating over a DataFrame yields
295-
:class:`datafusion.RecordBatch` instances lazily.
296-
297292
See :ref:`user_guide_concepts` in the online documentation for more information.
298293
"""
299294

@@ -310,7 +305,7 @@ def into_view(self) -> pa.Table:
310305
return self.df.into_view()
311306

312307
def __getitem__(self, key: str | list[str]) -> DataFrame:
313-
"""Return a new :py:class:`DataFrame` with the specified column or columns.
308+
"""Return a new :py:class`DataFrame` with the specified column or columns.
314309
315310
Args:
316311
key: Column name or list of column names to select.
@@ -1040,15 +1035,6 @@ def execute_stream_partitioned(self) -> list[RecordBatchStream]:
10401035
streams = self.df.execute_stream_partitioned()
10411036
return [RecordBatchStream(rbs) for rbs in streams]
10421037

1043-
def to_record_batch_stream(self) -> RecordBatchStream: # noqa: F811
1044-
"""Return a :py:class:`RecordBatchStream` over this DataFrame's results.
1045-
1046-
Returns:
1047-
A ``RecordBatchStream`` representing the lazily generated record
1048-
batches for this DataFrame.
1049-
"""
1050-
return self.execute_stream()
1051-
10521038
def to_pandas(self) -> pd.DataFrame:
10531039
"""Execute the :py:class:`DataFrame` and convert it into a Pandas DataFrame.
10541040
@@ -1112,33 +1098,21 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
11121098
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
11131099

11141100
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1115-
"""Export the DataFrame as an Arrow C Stream.
1101+
"""Export an Arrow PyCapsule Stream.
11161102
1117-
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1118-
Arrow's C Stream interface. Record batches are produced incrementally, so the
1119-
full result set is never materialized in memory. When ``requested_schema`` is
1120-
provided, only straightforward projections such as column selection or
1121-
reordering are applied.
1103+
This will execute and collect the DataFrame. We will attempt to respect the
1104+
requested schema, but only trivial transformations will be applied such as only
1105+
returning the fields listed in the requested schema if their data types match
1106+
those in the DataFrame.
11221107
11231108
Args:
11241109
requested_schema: Attempt to provide the DataFrame using this schema.
11251110
11261111
Returns:
1127-
Arrow PyCapsule object representing an ``ArrowArrayStream``.
1112+
Arrow PyCapsule object.
11281113
"""
1129-
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1130-
# ``execute_stream_partitioned`` under the hood to stream batches while
1131-
# preserving the original partition order.
11321114
return self.df.__arrow_c_stream__(requested_schema)
11331115

1134-
def __iter__(self) -> Iterator[RecordBatch]:
1135-
"""Return an iterator over this DataFrame's record batches."""
1136-
return iter(self.to_record_batch_stream())
1137-
1138-
def __aiter__(self) -> AsyncIterator[RecordBatch]:
1139-
"""Return an async iterator over this DataFrame's record batches."""
1140-
return self.to_record_batch_stream().__aiter__()
1141-
11421116
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11431117
"""Apply a function to the current DataFrame which returns another DataFrame.
11441118

python/datafusion/record_batch.py

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,26 +46,6 @@ def to_pyarrow(self) -> pa.RecordBatch:
4646
"""Convert to :py:class:`pa.RecordBatch`."""
4747
return self.record_batch.to_pyarrow()
4848

49-
def __arrow_c_array__(
50-
self, requested_schema: object | None = None
51-
) -> tuple[object, object]:
52-
"""Export the record batch via the Arrow C Data Interface.
53-
54-
This allows zero-copy interchange with libraries that support the
55-
`Arrow PyCapsule interface <https://arrow.apache.org/docs/format/
56-
CDataInterface/PyCapsuleInterface.html>`_.
57-
58-
Args:
59-
requested_schema: Attempt to provide the record batch using this
60-
schema. Only straightforward projections such as column
61-
selection or reordering are applied.
62-
63-
Returns:
64-
Two Arrow PyCapsule objects representing the ``ArrowArray`` and
65-
``ArrowSchema``.
66-
"""
67-
return self.record_batch.__arrow_c_array__(requested_schema)
68-
6949

7050
class RecordBatchStream:
7151
"""This class represents a stream of record batches.
@@ -83,19 +63,19 @@ def next(self) -> RecordBatch:
8363
return next(self)
8464

8565
async def __anext__(self) -> RecordBatch:
86-
"""Return the next :py:class:`RecordBatch` in the stream asynchronously."""
66+
"""Async iterator function."""
8767
next_batch = await self.rbs.__anext__()
8868
return RecordBatch(next_batch)
8969

9070
def __next__(self) -> RecordBatch:
91-
"""Return the next :py:class:`RecordBatch` in the stream."""
71+
"""Iterator function."""
9272
next_batch = next(self.rbs)
9373
return RecordBatch(next_batch)
9474

9575
def __aiter__(self) -> typing_extensions.Self:
96-
"""Return an asynchronous iterator over record batches."""
76+
"""Async iterator function."""
9777
return self
9878

9979
def __iter__(self) -> typing_extensions.Self:
100-
"""Return an iterator over record batches."""
80+
"""Iterator function."""
10181
return self

python/tests/conftest.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import pyarrow as pa
1919
import pytest
20-
from datafusion import DataFrame, SessionContext
20+
from datafusion import SessionContext
2121
from pyarrow.csv import write_csv
2222

2323

@@ -49,12 +49,3 @@ def database(ctx, tmp_path):
4949
delimiter=",",
5050
schema_infer_max_records=10,
5151
)
52-
53-
54-
@pytest.fixture
55-
def fail_collect(monkeypatch):
56-
def _fail_collect(self, *args, **kwargs): # pragma: no cover - failure path
57-
msg = "collect should not be called"
58-
raise AssertionError(msg)
59-
60-
monkeypatch.setattr(DataFrame, "collect", _fail_collect)

0 commit comments

Comments
 (0)