Skip to content

Commit 5a77c8f

Browse files
committed
Revert "revert branch UNPICK"
This reverts commit a63d083.
1 parent a63d083 commit 5a77c8f

File tree

15 files changed

+415
-146
lines changed

15 files changed

+415
-146
lines changed

AGENTS.md

Lines changed: 0 additions & 53 deletions
This file was deleted.

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
4949
async-trait = "0.1.89"
5050
futures = "0.3"
51+
rayon = "1.10"
5152
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
5253
url = "2"
5354
log = "0.4.27"

benchmarks/collect_gil_bench.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import time
2+
3+
import pyarrow as pa
4+
from datafusion import SessionContext
5+
6+
7+
def run(n_batches: int = 8, batch_size: int = 1_000_000) -> None:
8+
ctx = SessionContext()
9+
batches = []
10+
for i in range(n_batches):
11+
start = i * batch_size
12+
arr = pa.array(range(start, start + batch_size))
13+
batches.append(pa.record_batch([arr], names=["a"]))
14+
15+
df = ctx.create_dataframe([batches])
16+
17+
start = time.perf_counter()
18+
df.collect()
19+
duration = time.perf_counter() - start
20+
print(f"{n_batches} batches collected in {duration:.3f}s")
21+
22+
23+
if __name__ == "__main__":
24+
run()

docs/source/user-guide/configuration.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,26 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
4747
print(ctx)
4848
4949
50+
.. _target_partitions:
51+
52+
Target partitions and threads
53+
-----------------------------
54+
55+
The :py:meth:`~datafusion.context.SessionConfig.with_target_partitions` method
56+
controls how many partitions DataFusion uses when executing a query. Each
57+
partition is processed on its own thread, so this setting effectively limits
58+
the number of threads that will be scheduled.
59+
60+
For most workloads a good starting value is the number of logical CPU cores on
61+
your machine. You can use :func:`os.cpu_count` to automatically configure this::
62+
63+
import os
64+
config = SessionConfig().with_target_partitions(os.cpu_count())
65+
66+
Choosing a value significantly higher than the available cores can lead to
67+
excessive context switching without performance gains, while a much lower value
68+
may underutilize the machine.
69+
70+
5071
You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
5172
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# RecordBatch conversion and the GIL
2+
3+
Profiling `DataFrame.collect` showed that converting each `RecordBatch` to
4+
PyArrow via `rb.to_pyarrow(py)` spent considerable time holding the Python GIL.
5+
Using `py-spy` on a query returning many batches indicated that more than
6+
95 % of the conversion executed while the GIL was held, meaning the work was
7+
effectively serialised.
8+
For queries that return many batches this limited CPU utilisation because only
9+
one conversion could run at a time.
10+
11+
The implementation now converts each batch to Arrow's C data (schema/array)
12+
while the GIL is released, acquiring the GIL only to wrap those pointers into
13+
PyArrow objects. This allows the CPU intensive portions of the conversion to
14+
run fully in parallel.
15+
16+
A simple benchmark is provided in `benchmarks/collect_gil_bench.py`.
17+
Run it twice to compare serial and parallel conversions:
18+
19+
```bash
20+
RAYON_NUM_THREADS=1 python benchmarks/collect_gil_bench.py # serial
21+
python benchmarks/collect_gil_bench.py # parallel
22+
```
23+
24+
On this container, collecting 128 1 M‑row batches took around 1.5 s with
25+
`RAYON_NUM_THREADS=1` and 0.8 s with the default thread pool, demonstrating
26+
that releasing the GIL allows conversions to run in parallel.

docs/source/user-guide/dataframe/index.rst

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ The ``DataFrame`` class is the core abstraction in DataFusion that represents ta
2525
on that data. DataFrames provide a flexible API for transforming data through various operations such as
2626
filtering, projection, aggregation, joining, and more.
2727

28-
A DataFrame represents a logical plan that is lazily evaluated. The actual execution occurs only when
29-
terminal operations like ``collect()``, ``show()``, or ``to_pandas()`` are called.
28+
A DataFrame represents a logical plan that is lazily evaluated. The actual execution occurs only when
29+
terminal operations like ``collect()``, ``show()``, or ``to_pandas()`` are called. ``collect()`` loads
30+
all record batches into Python memory; for large results you may want to stream data instead using
31+
``execute_stream()`` or ``__arrow_c_stream__()``.
3032

3133
Creating DataFrames
3234
-------------------
@@ -128,27 +130,47 @@ DataFusion's DataFrame API offers a wide range of operations:
128130
129131
Terminal Operations
130132
-------------------
131-
132-
To materialize the results of your DataFrame operations:
133+
``collect()`` materializes every record batch in Python. While convenient, this
134+
eagerly loads the full result set into memory and can overwhelm the Python
135+
process for large queries. Alternatives that stream data from Rust avoid this
136+
memory growth:
133137

134138
.. code-block:: python
135139
136-
# Collect all data as PyArrow RecordBatches
140+
# Collect all data as PyArrow RecordBatches (loads entire result set)
137141
result_batches = df.collect()
138-
139-
# Convert to various formats
142+
143+
# Stream batches using the native API
144+
stream = df.execute_stream()
145+
for batch in stream:
146+
... # process each RecordBatch
147+
148+
# Stream via the Arrow C Data Interface
149+
import pyarrow as pa
150+
reader = pa.ipc.RecordBatchStreamReader._import_from_c(df.__arrow_c_stream__())
151+
for batch in reader:
152+
...
153+
154+
# Convert to various formats (also load all data into memory)
140155
pandas_df = df.to_pandas() # Pandas DataFrame
141156
polars_df = df.to_polars() # Polars DataFrame
142157
arrow_table = df.to_arrow_table() # PyArrow Table
143158
py_dict = df.to_pydict() # Python dictionary
144159
py_list = df.to_pylist() # Python list of dictionaries
145-
160+
146161
# Display results
147162
df.show() # Print tabular format to console
148-
163+
149164
# Count rows
150165
count = df.count()
151166
167+
For large outputs, prefer engine-level writers such as ``df.write_parquet()``
168+
or other DataFusion writers. These stream data directly to the destination and
169+
avoid buffering the entire dataset in Python.
170+
171+
For more on parallel record batch conversion and the Python GIL, see
172+
:doc:`collect-gil`.
173+
152174
HTML Rendering
153175
--------------
154176

@@ -207,3 +229,4 @@ For a complete list of available functions, see the :py:mod:`datafusion.function
207229
:maxdepth: 1
208230

209231
rendering
232+
collect-gil

docs/source/user-guide/io/arrow.rst

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,34 @@ and returns a ``StructArray``. Common pyarrow sources you can use are:
5757
Exporting from DataFusion
5858
-------------------------
5959

60-
DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any
61-
Python library that accepts these can import a DataFusion DataFrame directly.
60+
DataFusion DataFrames implement ``__arrow_c_stream__`` so any Python library
61+
that accepts this interface can import a DataFusion ``DataFrame`` directly.
6262

63-
.. warning::
64-
It is important to note that this will cause the DataFrame execution to happen, which may be
65-
a time consuming task. That is, you will cause a
66-
:py:func:`datafusion.dataframe.DataFrame.collect` operation call to occur.
63+
``collect()`` or ``pa.table(df)`` will materialize every record batch in
64+
Python. For large results this can quickly exhaust memory. Instead, stream the
65+
output incrementally:
6766

67+
.. ipython:: python
68+
69+
# Stream batches with DataFusion's native API
70+
stream = df.execute_stream()
71+
for batch in stream:
72+
... # process each RecordBatch as it arrives
73+
74+
.. ipython:: python
75+
76+
# Expose a C stream that PyArrow can consume lazily
77+
import pyarrow as pa
78+
reader = pa.ipc.RecordBatchStreamReader._import_from_c(df.__arrow_c_stream__())
79+
for batch in reader:
80+
... # process each batch without buffering the entire table
81+
82+
If the goal is simply to persist results, prefer engine-level writers such as
83+
``df.write_parquet()``. These writers stream data from Rust directly to the
84+
destination and avoid Python-side memory growth.
6885

6986
.. ipython:: python
7087
7188
df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d"))
72-
pa.table(df)
89+
pa.table(df) # loads all batches into memory
7390

python/datafusion/context.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,13 @@ def with_batch_size(self, batch_size: int) -> SessionConfig:
161161
def with_target_partitions(self, target_partitions: int) -> SessionConfig:
162162
"""Customize the number of target partitions for query execution.
163163
164-
Increasing partitions can increase concurrency.
164+
Each partition is processed on its own thread, so this value controls
165+
the degree of parallelism. A good starting point is the number of
166+
logical CPU cores on your machine, for example
167+
``SessionConfig().with_target_partitions(os.cpu_count())``.
168+
169+
See the :ref:`configuration guide <target_partitions>` for more
170+
discussion on choosing a value.
165171
166172
Args:
167173
target_partitions: Number of target partitions.

0 commit comments

Comments
 (0)