Skip to content

Commit 631e8e7

Browse files
committed
Revert "revert branch UNPICK"
This reverts commit fe80e50.
1 parent fe80e50 commit 631e8e7

File tree

14 files changed

+693
-379
lines changed

14 files changed

+693
-379
lines changed

Cargo.lock

Lines changed: 297 additions & 285 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ readme = "README.md"
2626
license = "Apache-2.0"
2727
edition = "2021"
2828
rust-version = "1.78"
29-
include = ["/src", "/datafusion", "/LICENSE.txt", "build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
29+
include = ["/src", "/datafusion", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
3030

3131
[features]
3232
default = ["mimalloc"]
@@ -48,6 +48,7 @@ uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
4949
async-trait = "0.1.89"
5050
futures = "0.3"
51+
rayon = "1.10"
5152
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
5253
url = "2"
5354
log = "0.4.27"

benchmarks/collect_gil_bench.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import time
2+
3+
import pyarrow as pa
4+
from datafusion import SessionContext
5+
6+
7+
def run(n_batches: int = 8, batch_size: int = 1_000_000) -> None:
8+
ctx = SessionContext()
9+
batches = []
10+
for i in range(n_batches):
11+
start = i * batch_size
12+
arr = pa.array(range(start, start + batch_size))
13+
batches.append(pa.record_batch([arr], names=["a"]))
14+
15+
df = ctx.create_dataframe([batches])
16+
17+
start = time.perf_counter()
18+
df.collect()
19+
duration = time.perf_counter() - start
20+
print(f"{n_batches} batches collected in {duration:.3f}s")
21+
22+
23+
if __name__ == "__main__":
24+
run()

docs/source/user-guide/configuration.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,26 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
4747
print(ctx)
4848
4949
50+
.. _target_partitions:
51+
52+
Target partitions and threads
53+
-----------------------------
54+
55+
The :py:meth:`~datafusion.context.SessionConfig.with_target_partitions` method
56+
controls how many partitions DataFusion uses when executing a query. Each
57+
partition is processed on its own thread, so this setting effectively limits
58+
the number of threads that will be scheduled.
59+
60+
For most workloads a good starting value is the number of logical CPU cores on
61+
your machine. You can use :func:`os.cpu_count` to automatically configure this::
62+
63+
import os
64+
config = SessionConfig().with_target_partitions(os.cpu_count())
65+
66+
Choosing a value significantly higher than the available cores can lead to
67+
excessive context switching without performance gains, while a much lower value
68+
may underutilize the machine.
69+
70+
5071
You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
5172
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# RecordBatch conversion and the GIL
2+
3+
Profiling `DataFrame.collect` showed that converting each `RecordBatch` to
4+
PyArrow via `rb.to_pyarrow(py)` spent considerable time holding the Python GIL.
5+
Using `py-spy` on a query returning many batches indicated that more than
6+
95 % of the conversion executed while the GIL was held, meaning the work was
7+
effectively serialised.
8+
For queries that return many batches this limited CPU utilisation because only
9+
one conversion could run at a time.
10+
11+
The implementation now converts each batch to Arrow's C data (schema/array)
12+
while the GIL is released, acquiring the GIL only to wrap those pointers into
13+
PyArrow objects. This allows the CPU intensive portions of the conversion to
14+
run fully in parallel.
15+
16+
A simple benchmark is provided in `benchmarks/collect_gil_bench.py`.
17+
Run it twice to compare serial and parallel conversions:
18+
19+
```bash
20+
RAYON_NUM_THREADS=1 python benchmarks/collect_gil_bench.py # serial
21+
python benchmarks/collect_gil_bench.py # parallel
22+
```
23+
24+
On this container, collecting 128 1 M‑row batches took around 1.5 s with
25+
`RAYON_NUM_THREADS=1` and 0.8 s with the default thread pool, demonstrating
26+
that releasing the GIL allows conversions to run in parallel.

docs/source/user-guide/dataframe/index.rst

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ The ``DataFrame`` class is the core abstraction in DataFusion that represents ta
2525
on that data. DataFrames provide a flexible API for transforming data through various operations such as
2626
filtering, projection, aggregation, joining, and more.
2727

28-
A DataFrame represents a logical plan that is lazily evaluated. The actual execution occurs only when
29-
terminal operations like ``collect()``, ``show()``, or ``to_pandas()`` are called.
28+
A DataFrame represents a logical plan that is lazily evaluated. The actual execution occurs only when
29+
terminal operations like ``collect()``, ``show()``, or ``to_pandas()`` are called. ``collect()`` loads
30+
all record batches into Python memory; for large results you may want to stream data instead using
31+
``execute_stream()`` or ``__arrow_c_stream__()``.
3032

3133
Creating DataFrames
3234
-------------------
@@ -128,27 +130,47 @@ DataFusion's DataFrame API offers a wide range of operations:
128130
129131
Terminal Operations
130132
-------------------
131-
132-
To materialize the results of your DataFrame operations:
133+
``collect()`` materializes every record batch in Python. While convenient, this
134+
eagerly loads the full result set into memory and can overwhelm the Python
135+
process for large queries. Alternatives that stream data from Rust avoid this
136+
memory growth:
133137

134138
.. code-block:: python
135139
136-
# Collect all data as PyArrow RecordBatches
140+
# Collect all data as PyArrow RecordBatches (loads entire result set)
137141
result_batches = df.collect()
138-
139-
# Convert to various formats
142+
143+
# Stream batches using the native API
144+
stream = df.execute_stream()
145+
for batch in stream:
146+
... # process each RecordBatch
147+
148+
# Stream via the Arrow C Data Interface
149+
import pyarrow as pa
150+
reader = pa.ipc.RecordBatchStreamReader._import_from_c(df.__arrow_c_stream__())
151+
for batch in reader:
152+
...
153+
154+
# Convert to various formats (also load all data into memory)
140155
pandas_df = df.to_pandas() # Pandas DataFrame
141156
polars_df = df.to_polars() # Polars DataFrame
142157
arrow_table = df.to_arrow_table() # PyArrow Table
143158
py_dict = df.to_pydict() # Python dictionary
144159
py_list = df.to_pylist() # Python list of dictionaries
145-
160+
146161
# Display results
147162
df.show() # Print tabular format to console
148-
163+
149164
# Count rows
150165
count = df.count()
151166
167+
For large outputs, prefer engine-level writers such as ``df.write_parquet()``
168+
or other DataFusion writers. These stream data directly to the destination and
169+
avoid buffering the entire dataset in Python.
170+
171+
For more on parallel record batch conversion and the Python GIL, see
172+
:doc:`collect-gil`.
173+
152174
HTML Rendering
153175
--------------
154176

@@ -207,3 +229,4 @@ For a complete list of available functions, see the :py:mod:`datafusion.function
207229
:maxdepth: 1
208230

209231
rendering
232+
collect-gil

docs/source/user-guide/io/arrow.rst

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,34 @@ and returns a ``StructArray``. Common pyarrow sources you can use are:
5757
Exporting from DataFusion
5858
-------------------------
5959

60-
DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any
61-
Python library that accepts these can import a DataFusion DataFrame directly.
60+
DataFusion DataFrames implement ``__arrow_c_stream__`` so any Python library
61+
that accepts this interface can import a DataFusion ``DataFrame`` directly.
6262

63-
.. warning::
64-
It is important to note that this will cause the DataFrame execution to happen, which may be
65-
a time consuming task. That is, you will cause a
66-
:py:func:`datafusion.dataframe.DataFrame.collect` operation call to occur.
63+
``collect()`` or ``pa.table(df)`` will materialize every record batch in
64+
Python. For large results this can quickly exhaust memory. Instead, stream the
65+
output incrementally:
6766

67+
.. ipython:: python
68+
69+
# Stream batches with DataFusion's native API
70+
stream = df.execute_stream()
71+
for batch in stream:
72+
... # process each RecordBatch as it arrives
73+
74+
.. ipython:: python
75+
76+
# Expose a C stream that PyArrow can consume lazily
77+
import pyarrow as pa
78+
reader = pa.ipc.RecordBatchStreamReader._import_from_c(df.__arrow_c_stream__())
79+
for batch in reader:
80+
... # process each batch without buffering the entire table
81+
82+
If the goal is simply to persist results, prefer engine-level writers such as
83+
``df.write_parquet()``. These writers stream data from Rust directly to the
84+
destination and avoid Python-side memory growth.
6885

6986
.. ipython:: python
7087
7188
df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d"))
72-
pa.table(df)
89+
pa.table(df) # loads all batches into memory
7390

python/datafusion/context.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,13 @@ def with_batch_size(self, batch_size: int) -> SessionConfig:
161161
def with_target_partitions(self, target_partitions: int) -> SessionConfig:
162162
"""Customize the number of target partitions for query execution.
163163
164-
Increasing partitions can increase concurrency.
164+
Each partition is processed on its own thread, so this value controls
165+
the degree of parallelism. A good starting point is the number of
166+
logical CPU cores on your machine, for example
167+
``SessionConfig().with_target_partitions(os.cpu_count())``.
168+
169+
See the :ref:`configuration guide <target_partitions>` for more
170+
discussion on choosing a value.
165171
166172
Args:
167173
target_partitions: Number of target partitions.

python/tests/test_dataframe.py

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import re
2121
import threading
2222
import time
23+
import tracemalloc
2324
from typing import Any
2425

2526
import pyarrow as pa
@@ -252,13 +253,6 @@ def test_filter(df):
252253
assert result.column(2) == pa.array([5])
253254

254255

255-
def test_show_empty(df, capsys):
256-
df_empty = df.filter(column("a") > literal(3))
257-
df_empty.show()
258-
captured = capsys.readouterr()
259-
assert "DataFrame has no rows" in captured.out
260-
261-
262256
def test_sort(df):
263257
df = df.sort(column("b").sort(ascending=False))
264258

@@ -1390,6 +1384,27 @@ def test_collect_partitioned():
13901384
assert [[batch]] == ctx.create_dataframe([[batch]]).collect_partitioned()
13911385

13921386

1387+
def test_collect_multiple_batches_to_pyarrow():
1388+
ctx = SessionContext()
1389+
1390+
batch1 = pa.RecordBatch.from_arrays(
1391+
[pa.array([1, 2])],
1392+
names=["a"],
1393+
)
1394+
batch2 = pa.RecordBatch.from_arrays(
1395+
[pa.array([3, 4])],
1396+
names=["a"],
1397+
)
1398+
1399+
df = ctx.create_dataframe([[batch1], [batch2]])
1400+
1401+
batches = df.collect()
1402+
1403+
assert len(batches) == 2
1404+
table = pa.Table.from_batches(batches)
1405+
assert table.column("a").to_pylist() == [1, 2, 3, 4]
1406+
1407+
13931408
def test_union(ctx):
13941409
batch = pa.RecordBatch.from_arrays(
13951410
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
@@ -1470,6 +1485,24 @@ def test_empty_to_pandas(df):
14701485
assert set(pandas_df.columns) == {"a", "b", "c"}
14711486

14721487

1488+
def test_show_no_batches(capsys):
1489+
"""Ensure showing a query with no batches still prints headers."""
1490+
ctx = SessionContext()
1491+
df = ctx.sql("SELECT 1 AS a WHERE 1=0")
1492+
df.show()
1493+
captured = capsys.readouterr()
1494+
assert "| a |" in captured.out
1495+
assert "Empty DataFrame" not in captured.out
1496+
1497+
1498+
def test_show_empty_dataframe(df, capsys):
1499+
"""Ensure showing an empty DataFrame prints a helpful message."""
1500+
empty_df = df.limit(0)
1501+
empty_df.show()
1502+
captured = capsys.readouterr()
1503+
assert "Empty DataFrame" in captured.out
1504+
1505+
14731506
def test_to_polars(df):
14741507
# Skip test if polars is not installed
14751508
pl = pytest.importorskip("polars")
@@ -1574,6 +1607,23 @@ async def test_execute_stream_partitioned_async(df):
15741607
assert not remaining_batches
15751608

15761609

1610+
def test_arrow_c_stream_streaming(large_df):
1611+
df = large_df.repartition(4)
1612+
capsule = df.__arrow_c_stream__()
1613+
ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
1614+
ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p]
1615+
ptr = ctypes.pythonapi.PyCapsule_GetPointer(capsule, b"arrow_array_stream")
1616+
reader = pa.RecordBatchReader._import_from_c(ptr)
1617+
1618+
tracemalloc.start()
1619+
batch_count = sum(1 for _ in reader)
1620+
_current, peak = tracemalloc.get_traced_memory()
1621+
tracemalloc.stop()
1622+
1623+
assert batch_count > 1
1624+
assert peak < 50 * MB
1625+
1626+
15771627
def test_empty_to_arrow_table(df):
15781628
# Convert empty datafusion dataframe to pyarrow Table
15791629
pyarrow_table = df.limit(0).to_arrow_table()
@@ -2664,19 +2714,3 @@ def trigger_interrupt():
26642714

26652715
# Make sure the interrupt thread has finished
26662716
interrupt_thread.join(timeout=1.0)
2667-
2668-
2669-
def test_show_select_where_no_rows(capsys) -> None:
2670-
ctx = SessionContext()
2671-
df = ctx.sql("SELECT 1 WHERE 1=0")
2672-
df.show()
2673-
out = capsys.readouterr().out
2674-
assert "DataFrame has no rows" in out
2675-
2676-
2677-
def test_show_from_empty_batch(capsys) -> None:
2678-
ctx = SessionContext()
2679-
batch = pa.record_batch([pa.array([], type=pa.int32())], names=["a"])
2680-
ctx.create_dataframe([[batch]]).show()
2681-
out = capsys.readouterr().out
2682-
assert "| a |" in out
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import pytest
2+
3+
4+
def test_record_batch_stream_next(ctx):
5+
stream = ctx.sql("SELECT 1 as a").execute_stream()
6+
batch = next(stream)
7+
assert batch.to_pyarrow().num_rows == 1
8+
with pytest.raises(StopIteration):
9+
next(stream)
10+
11+
12+
@pytest.mark.asyncio
13+
async def test_record_batch_stream_anext(ctx):
14+
stream = ctx.sql("SELECT 1 as a").execute_stream()
15+
batch = await stream.__anext__()
16+
assert batch.to_pyarrow().num_rows == 1
17+
with pytest.raises(StopAsyncIteration):
18+
await stream.__anext__()

0 commit comments

Comments
 (0)