Skip to content

Commit fe80e50

Browse files
committed
revert branch UNPICK
1 parent 712636b commit fe80e50

File tree

14 files changed

+379
-693
lines changed

14 files changed

+379
-693
lines changed

Cargo.lock

Lines changed: 285 additions & 297 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ readme = "README.md"
2626
license = "Apache-2.0"
2727
edition = "2021"
2828
rust-version = "1.78"
29-
include = ["/src", "/datafusion", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
29+
include = ["/src", "/datafusion", "/LICENSE.txt", "build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
3030

3131
[features]
3232
default = ["mimalloc"]
@@ -48,7 +48,6 @@ uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
4949
async-trait = "0.1.89"
5050
futures = "0.3"
51-
rayon = "1.10"
5251
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
5352
url = "2"
5453
log = "0.4.27"

benchmarks/collect_gil_bench.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

docs/source/user-guide/configuration.rst

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,26 +47,5 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
4747
print(ctx)
4848
4949
50-
.. _target_partitions:
51-
52-
Target partitions and threads
53-
-----------------------------
54-
55-
The :py:meth:`~datafusion.context.SessionConfig.with_target_partitions` method
56-
controls how many partitions DataFusion uses when executing a query. Each
57-
partition is processed on its own thread, so this setting effectively limits
58-
the number of threads that will be scheduled.
59-
60-
For most workloads a good starting value is the number of logical CPU cores on
61-
your machine. You can use :func:`os.cpu_count` to automatically configure this::
62-
63-
import os
64-
config = SessionConfig().with_target_partitions(os.cpu_count())
65-
66-
Choosing a value significantly higher than the available cores can lead to
67-
excessive context switching without performance gains, while a much lower value
68-
may underutilize the machine.
69-
70-
7150
You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
7251
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.

docs/source/user-guide/dataframe/collect-gil.md

Lines changed: 0 additions & 26 deletions
This file was deleted.

docs/source/user-guide/dataframe/index.rst

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ The ``DataFrame`` class is the core abstraction in DataFusion that represents ta
2525
on that data. DataFrames provide a flexible API for transforming data through various operations such as
2626
filtering, projection, aggregation, joining, and more.
2727

28-
A DataFrame represents a logical plan that is lazily evaluated. The actual execution occurs only when
29-
terminal operations like ``collect()``, ``show()``, or ``to_pandas()`` are called. ``collect()`` loads
30-
all record batches into Python memory; for large results you may want to stream data instead using
31-
``execute_stream()`` or ``__arrow_c_stream__()``.
28+
A DataFrame represents a logical plan that is lazily evaluated. The actual execution occurs only when
29+
terminal operations like ``collect()``, ``show()``, or ``to_pandas()`` are called.
3230

3331
Creating DataFrames
3432
-------------------
@@ -130,47 +128,27 @@ DataFusion's DataFrame API offers a wide range of operations:
130128
131129
Terminal Operations
132130
-------------------
133-
``collect()`` materializes every record batch in Python. While convenient, this
134-
eagerly loads the full result set into memory and can overwhelm the Python
135-
process for large queries. Alternatives that stream data from Rust avoid this
136-
memory growth:
131+
132+
To materialize the results of your DataFrame operations:
137133

138134
.. code-block:: python
139135
140-
# Collect all data as PyArrow RecordBatches (loads entire result set)
136+
# Collect all data as PyArrow RecordBatches
141137
result_batches = df.collect()
142-
143-
# Stream batches using the native API
144-
stream = df.execute_stream()
145-
for batch in stream:
146-
... # process each RecordBatch
147-
148-
# Stream via the Arrow C Data Interface
149-
import pyarrow as pa
150-
reader = pa.ipc.RecordBatchStreamReader._import_from_c(df.__arrow_c_stream__())
151-
for batch in reader:
152-
...
153-
154-
# Convert to various formats (also load all data into memory)
138+
139+
# Convert to various formats
155140
pandas_df = df.to_pandas() # Pandas DataFrame
156141
polars_df = df.to_polars() # Polars DataFrame
157142
arrow_table = df.to_arrow_table() # PyArrow Table
158143
py_dict = df.to_pydict() # Python dictionary
159144
py_list = df.to_pylist() # Python list of dictionaries
160-
145+
161146
# Display results
162147
df.show() # Print tabular format to console
163-
148+
164149
# Count rows
165150
count = df.count()
166151
167-
For large outputs, prefer engine-level writers such as ``df.write_parquet()``
168-
or other DataFusion writers. These stream data directly to the destination and
169-
avoid buffering the entire dataset in Python.
170-
171-
For more on parallel record batch conversion and the Python GIL, see
172-
:doc:`collect-gil`.
173-
174152
HTML Rendering
175153
--------------
176154

@@ -229,4 +207,3 @@ For a complete list of available functions, see the :py:mod:`datafusion.function
229207
:maxdepth: 1
230208

231209
rendering
232-
collect-gil

docs/source/user-guide/io/arrow.rst

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,34 +57,17 @@ and returns a ``StructArray``. Common pyarrow sources you can use are:
5757
Exporting from DataFusion
5858
-------------------------
5959

60-
DataFusion DataFrames implement ``__arrow_c_stream__`` so any Python library
61-
that accepts this interface can import a DataFusion ``DataFrame`` directly.
60+
DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any
61+
Python library that accepts these can import a DataFusion DataFrame directly.
6262

63-
``collect()`` or ``pa.table(df)`` will materialize every record batch in
64-
Python. For large results this can quickly exhaust memory. Instead, stream the
65-
output incrementally:
63+
.. warning::
64+
It is important to note that this will cause the DataFrame execution to happen, which may be
65+
a time consuming task. That is, you will cause a
66+
:py:func:`datafusion.dataframe.DataFrame.collect` operation call to occur.
6667

67-
.. ipython:: python
68-
69-
# Stream batches with DataFusion's native API
70-
stream = df.execute_stream()
71-
for batch in stream:
72-
... # process each RecordBatch as it arrives
73-
74-
.. ipython:: python
75-
76-
# Expose a C stream that PyArrow can consume lazily
77-
import pyarrow as pa
78-
reader = pa.ipc.RecordBatchStreamReader._import_from_c(df.__arrow_c_stream__())
79-
for batch in reader:
80-
... # process each batch without buffering the entire table
81-
82-
If the goal is simply to persist results, prefer engine-level writers such as
83-
``df.write_parquet()``. These writers stream data from Rust directly to the
84-
destination and avoid Python-side memory growth.
8568

8669
.. ipython:: python
8770
8871
df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d"))
89-
pa.table(df) # loads all batches into memory
72+
pa.table(df)
9073

python/datafusion/context.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,7 @@ def with_batch_size(self, batch_size: int) -> SessionConfig:
161161
def with_target_partitions(self, target_partitions: int) -> SessionConfig:
162162
"""Customize the number of target partitions for query execution.
163163
164-
Each partition is processed on its own thread, so this value controls
165-
the degree of parallelism. A good starting point is the number of
166-
logical CPU cores on your machine, for example
167-
``SessionConfig().with_target_partitions(os.cpu_count())``.
168-
169-
See the :ref:`configuration guide <target_partitions>` for more
170-
discussion on choosing a value.
164+
Increasing partitions can increase concurrency.
171165
172166
Args:
173167
target_partitions: Number of target partitions.

python/tests/test_dataframe.py

Lines changed: 23 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import re
2121
import threading
2222
import time
23-
import tracemalloc
2423
from typing import Any
2524

2625
import pyarrow as pa
@@ -253,6 +252,13 @@ def test_filter(df):
253252
assert result.column(2) == pa.array([5])
254253

255254

255+
def test_show_empty(df, capsys):
256+
df_empty = df.filter(column("a") > literal(3))
257+
df_empty.show()
258+
captured = capsys.readouterr()
259+
assert "DataFrame has no rows" in captured.out
260+
261+
256262
def test_sort(df):
257263
df = df.sort(column("b").sort(ascending=False))
258264

@@ -1384,27 +1390,6 @@ def test_collect_partitioned():
13841390
assert [[batch]] == ctx.create_dataframe([[batch]]).collect_partitioned()
13851391

13861392

1387-
def test_collect_multiple_batches_to_pyarrow():
1388-
ctx = SessionContext()
1389-
1390-
batch1 = pa.RecordBatch.from_arrays(
1391-
[pa.array([1, 2])],
1392-
names=["a"],
1393-
)
1394-
batch2 = pa.RecordBatch.from_arrays(
1395-
[pa.array([3, 4])],
1396-
names=["a"],
1397-
)
1398-
1399-
df = ctx.create_dataframe([[batch1], [batch2]])
1400-
1401-
batches = df.collect()
1402-
1403-
assert len(batches) == 2
1404-
table = pa.Table.from_batches(batches)
1405-
assert table.column("a").to_pylist() == [1, 2, 3, 4]
1406-
1407-
14081393
def test_union(ctx):
14091394
batch = pa.RecordBatch.from_arrays(
14101395
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
@@ -1485,24 +1470,6 @@ def test_empty_to_pandas(df):
14851470
assert set(pandas_df.columns) == {"a", "b", "c"}
14861471

14871472

1488-
def test_show_no_batches(capsys):
1489-
"""Ensure showing a query with no batches still prints headers."""
1490-
ctx = SessionContext()
1491-
df = ctx.sql("SELECT 1 AS a WHERE 1=0")
1492-
df.show()
1493-
captured = capsys.readouterr()
1494-
assert "| a |" in captured.out
1495-
assert "Empty DataFrame" not in captured.out
1496-
1497-
1498-
def test_show_empty_dataframe(df, capsys):
1499-
"""Ensure showing an empty DataFrame prints a helpful message."""
1500-
empty_df = df.limit(0)
1501-
empty_df.show()
1502-
captured = capsys.readouterr()
1503-
assert "Empty DataFrame" in captured.out
1504-
1505-
15061473
def test_to_polars(df):
15071474
# Skip test if polars is not installed
15081475
pl = pytest.importorskip("polars")
@@ -1607,23 +1574,6 @@ async def test_execute_stream_partitioned_async(df):
16071574
assert not remaining_batches
16081575

16091576

1610-
def test_arrow_c_stream_streaming(large_df):
1611-
df = large_df.repartition(4)
1612-
capsule = df.__arrow_c_stream__()
1613-
ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
1614-
ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p]
1615-
ptr = ctypes.pythonapi.PyCapsule_GetPointer(capsule, b"arrow_array_stream")
1616-
reader = pa.RecordBatchReader._import_from_c(ptr)
1617-
1618-
tracemalloc.start()
1619-
batch_count = sum(1 for _ in reader)
1620-
_current, peak = tracemalloc.get_traced_memory()
1621-
tracemalloc.stop()
1622-
1623-
assert batch_count > 1
1624-
assert peak < 50 * MB
1625-
1626-
16271577
def test_empty_to_arrow_table(df):
16281578
# Convert empty datafusion dataframe to pyarrow Table
16291579
pyarrow_table = df.limit(0).to_arrow_table()
@@ -2714,3 +2664,19 @@ def trigger_interrupt():
27142664

27152665
# Make sure the interrupt thread has finished
27162666
interrupt_thread.join(timeout=1.0)
2667+
2668+
2669+
def test_show_select_where_no_rows(capsys) -> None:
2670+
ctx = SessionContext()
2671+
df = ctx.sql("SELECT 1 WHERE 1=0")
2672+
df.show()
2673+
out = capsys.readouterr().out
2674+
assert "DataFrame has no rows" in out
2675+
2676+
2677+
def test_show_from_empty_batch(capsys) -> None:
2678+
ctx = SessionContext()
2679+
batch = pa.record_batch([pa.array([], type=pa.int32())], names=["a"])
2680+
ctx.create_dataframe([[batch]]).show()
2681+
out = capsys.readouterr().out
2682+
assert "| a |" in out

python/tests/test_record_batch_stream.py

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)