Skip to content

Commit cf7b5b7

Browse files
committed
UNPICK
1 parent 7fbeeac commit cf7b5b7

File tree

10 files changed

+56
-439
lines changed

10 files changed

+56
-439
lines changed

docs/source/user-guide/dataframe/index.rst

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -145,31 +145,10 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152-
PyArrow Streaming
153-
-----------------
154-
155-
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156-
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157-
Earlier versions eagerly converted the entire DataFrame when exporting to
158-
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159-
are produced lazily so you can process arbitrarily large results without
160-
out-of-memory errors.
161-
162-
.. code-block:: python
163-
164-
import pyarrow as pa
165-
166-
# Create a PyArrow RecordBatchReader without materializing all batches
167-
reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
168-
for batch in reader:
169-
... # process each batch as it is produced
170-
171-
See :doc:`../io/arrow` for additional details on the Arrow interface.
172-
173152
HTML Rendering
174153
--------------
175154

python/datafusion/_testing.py

Lines changed: 0 additions & 42 deletions
This file was deleted.

python/datafusion/context.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,6 @@ def from_polars(self, data: pl.DataFrame, name: str | None = None) -> DataFrame:
731731
"""
732732
return DataFrame(self.ctx.from_polars(data, name))
733733

734-
735734
# https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
736735
# is the discussion on how we arrived at adding register_view
737736
def register_view(self, name: str, df: DataFrame) -> None:

python/datafusion/dataframe.py

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
TYPE_CHECKING,
2727
Any,
2828
Iterable,
29-
Iterator,
3029
Literal,
3130
Optional,
3231
Union,
@@ -1099,37 +1098,21 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
10991098
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
11001099

11011100
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1102-
"""Export the DataFrame as an Arrow C Stream.
1101+
"""Export an Arrow PyCapsule Stream.
11031102
1104-
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1105-
Arrow's C Stream interface. Record batches are produced incrementally, so the
1106-
full result set is never materialized in memory. When ``requested_schema`` is
1107-
provided, only straightforward projections such as column selection or
1108-
reordering are applied.
1103+
This will execute and collect the DataFrame. We will attempt to respect the
1104+
requested schema, but only trivial transformations will be applied such as only
1105+
returning the fields listed in the requested schema if their data types match
1106+
those in the DataFrame.
11091107
11101108
Args:
11111109
requested_schema: Attempt to provide the DataFrame using this schema.
11121110
11131111
Returns:
1114-
Arrow PyCapsule object representing an ``ArrowArrayStream``.
1112+
Arrow PyCapsule object.
11151113
"""
1116-
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1117-
# ``execute_stream`` under the hood to stream batches one at a time.
11181114
return self.df.__arrow_c_stream__(requested_schema)
11191115

1120-
def __iter__(self) -> Iterator[pa.RecordBatch]:
1121-
"""Yield record batches from the DataFrame without materializing results.
1122-
1123-
This implementation streams record batches via the Arrow C Stream
1124-
interface, allowing callers such as :func:`pyarrow.Table.from_batches` to
1125-
consume results lazily. The DataFrame is executed using DataFusion's
1126-
streaming APIs so ``collect`` is never invoked.
1127-
"""
1128-
import pyarrow as pa
1129-
1130-
reader = pa.RecordBatchReader._import_from_c_capsule(self.__arrow_c_stream__())
1131-
yield from reader
1132-
11331116
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11341117
"""Apply a function to the current DataFrame which returns another DataFrame.
11351118

python/tests/test_dataframe.py

Lines changed: 0 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,37 +1582,6 @@ def test_empty_to_arrow_table(df):
15821582
assert set(pyarrow_table.column_names) == {"a", "b", "c"}
15831583

15841584

1585-
def test_arrow_c_stream_to_table(monkeypatch):
1586-
ctx = SessionContext()
1587-
1588-
# Create a DataFrame with two separate record batches
1589-
batch1 = pa.record_batch([pa.array([1])], names=["a"])
1590-
batch2 = pa.record_batch([pa.array([2])], names=["a"])
1591-
df = ctx.create_dataframe([[batch1], [batch2]])
1592-
1593-
# Fail if the DataFrame is pre-collected
1594-
def fail_collect(self): # pragma: no cover - failure path
1595-
msg = "collect should not be called"
1596-
raise AssertionError(msg)
1597-
1598-
monkeypatch.setattr(DataFrame, "collect", fail_collect)
1599-
1600-
table = pa.Table.from_batches(df)
1601-
expected = pa.Table.from_batches([batch1, batch2])
1602-
1603-
assert table.equals(expected)
1604-
assert table.schema == df.schema()
1605-
assert table.column("a").num_chunks == 2
1606-
1607-
1608-
def test_arrow_c_stream_reader(df):
1609-
reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
1610-
assert isinstance(reader, pa.RecordBatchReader)
1611-
table = pa.Table.from_batches(reader)
1612-
expected = pa.Table.from_batches(df.collect())
1613-
assert table.equals(expected)
1614-
1615-
16161585
def test_to_pylist(df):
16171586
# Convert datafusion dataframe to Python list
16181587
pylist = df.to_pylist()
@@ -2697,110 +2666,6 @@ def trigger_interrupt():
26972666
interrupt_thread.join(timeout=1.0)
26982667

26992668

2700-
def test_arrow_c_stream_interrupted():
2701-
"""__arrow_c_stream__ responds to ``KeyboardInterrupt`` signals.
2702-
2703-
Similar to ``test_collect_interrupted`` this test issues a long running
2704-
query, but consumes the results via ``__arrow_c_stream__``. It then raises
2705-
``KeyboardInterrupt`` in the main thread and verifies that the stream
2706-
iteration stops promptly with the appropriate exception.
2707-
"""
2708-
2709-
ctx = SessionContext()
2710-
2711-
batches = []
2712-
for i in range(10):
2713-
batch = pa.RecordBatch.from_arrays(
2714-
[
2715-
pa.array(list(range(i * 1000, (i + 1) * 1000))),
2716-
pa.array([f"value_{j}" for j in range(i * 1000, (i + 1) * 1000)]),
2717-
],
2718-
names=["a", "b"],
2719-
)
2720-
batches.append(batch)
2721-
2722-
ctx.register_record_batches("t1", [batches])
2723-
ctx.register_record_batches("t2", [batches])
2724-
2725-
df = ctx.sql(
2726-
"""
2727-
WITH t1_expanded AS (
2728-
SELECT
2729-
a,
2730-
b,
2731-
CAST(a AS DOUBLE) / 1.5 AS c,
2732-
CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS d
2733-
FROM t1
2734-
CROSS JOIN (SELECT 1 AS dummy FROM t1 LIMIT 5)
2735-
),
2736-
t2_expanded AS (
2737-
SELECT
2738-
a,
2739-
b,
2740-
CAST(a AS DOUBLE) * 2.5 AS e,
2741-
CAST(a AS DOUBLE) * CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS f
2742-
FROM t2
2743-
CROSS JOIN (SELECT 1 AS dummy FROM t2 LIMIT 5)
2744-
)
2745-
SELECT
2746-
t1.a, t1.b, t1.c, t1.d,
2747-
t2.a AS a2, t2.b AS b2, t2.e, t2.f
2748-
FROM t1_expanded t1
2749-
JOIN t2_expanded t2 ON t1.a % 100 = t2.a % 100
2750-
WHERE t1.a > 100 AND t2.a > 100
2751-
"""
2752-
)
2753-
2754-
reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
2755-
2756-
interrupted = False
2757-
interrupt_error = None
2758-
query_started = threading.Event()
2759-
max_wait_time = 5.0
2760-
2761-
def trigger_interrupt():
2762-
start_time = time.time()
2763-
while not query_started.is_set():
2764-
time.sleep(0.1)
2765-
if time.time() - start_time > max_wait_time:
2766-
msg = f"Query did not start within {max_wait_time} seconds"
2767-
raise RuntimeError(msg)
2768-
2769-
thread_id = threading.main_thread().ident
2770-
if thread_id is None:
2771-
msg = "Cannot get main thread ID"
2772-
raise RuntimeError(msg)
2773-
2774-
exception = ctypes.py_object(KeyboardInterrupt)
2775-
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
2776-
ctypes.c_long(thread_id), exception
2777-
)
2778-
if res != 1:
2779-
ctypes.pythonapi.PyThreadState_SetAsyncExc(
2780-
ctypes.c_long(thread_id), ctypes.py_object(0)
2781-
)
2782-
msg = "Failed to raise KeyboardInterrupt in main thread"
2783-
raise RuntimeError(msg)
2784-
2785-
interrupt_thread = threading.Thread(target=trigger_interrupt)
2786-
interrupt_thread.daemon = True
2787-
interrupt_thread.start()
2788-
2789-
try:
2790-
query_started.set()
2791-
# consume the reader which should block and be interrupted
2792-
reader.read_all()
2793-
except KeyboardInterrupt:
2794-
interrupted = True
2795-
except Exception as e: # pragma: no cover - unexpected errors
2796-
interrupt_error = e
2797-
2798-
if not interrupted:
2799-
pytest.fail(f"Stream was not interrupted; got error: {interrupt_error}")
2800-
2801-
interrupt_thread.join(timeout=1.0)
2802-
2803-
28042669
def test_show_select_where_no_rows(capsys) -> None:
28052670
ctx = SessionContext()
28062671
df = ctx.sql("SELECT 1 WHERE 1=0")

python/tests/test_io.py

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
from pathlib import Path
1818

1919
import pyarrow as pa
20-
import pytest
21-
from datafusion import DataFrame, column
22-
from datafusion._testing import range_table
20+
from datafusion import column
2321
from datafusion.io import read_avro, read_csv, read_json, read_parquet
2422

2523

@@ -94,44 +92,3 @@ def test_read_avro():
9492
path = Path.cwd() / "testing/data/avro/alltypes_plain.avro"
9593
avro_df = read_avro(path=path)
9694
assert avro_df is not None
97-
98-
99-
def test_arrow_c_stream_large_dataset(ctx):
100-
"""DataFrame.__arrow_c_stream__ yields batches incrementally.
101-
102-
This test constructs a DataFrame that would be far larger than available
103-
memory if materialized. The ``__arrow_c_stream__`` method should expose a
104-
stream of record batches without collecting the full dataset, so reading a
105-
handful of batches should not exhaust process memory.
106-
"""
107-
# Create a very large DataFrame using range; this would be terabytes if collected
108-
df = range_table(ctx, 0, 1 << 40)
109-
110-
reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
111-
112-
# Track RSS before consuming batches
113-
psutil = pytest.importorskip("psutil")
114-
process = psutil.Process()
115-
start_rss = process.memory_info().rss
116-
117-
for _ in range(5):
118-
batch = reader.read_next_batch()
119-
assert batch is not None
120-
assert len(batch) > 0
121-
current_rss = process.memory_info().rss
122-
# Ensure memory usage hasn't grown substantially (>50MB)
123-
assert current_rss - start_rss < 50 * 1024 * 1024
124-
125-
126-
def test_table_from_batches_stream(ctx, monkeypatch):
127-
df = range_table(ctx, 0, 10)
128-
129-
def fail_collect(self): # pragma: no cover - failure path
130-
msg = "collect should not be called"
131-
raise AssertionError(msg)
132-
133-
monkeypatch.setattr(DataFrame, "collect", fail_collect)
134-
135-
table = pa.Table.from_batches(df)
136-
assert table.shape == (10, 1)
137-
assert table.column_names == ["value"]

src/context.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use pyo3::prelude::*;
3434
use crate::catalog::{PyCatalog, PyTable, RustWrappedPyCatalogProvider};
3535
use crate::dataframe::PyDataFrame;
3636
use crate::dataset::Dataset;
37-
use crate::errors::{py_datafusion_err, PyDataFusionResult};
37+
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult};
3838
use crate::expr::sort_expr::PySortExpr;
3939
use crate::physical_plan::PyExecutionPlan;
4040
use crate::record_batch::PyRecordBatchStream;
@@ -45,7 +45,7 @@ use crate::udaf::PyAggregateUDF;
4545
use crate::udf::PyScalarUDF;
4646
use crate::udtf::PyTableFunction;
4747
use crate::udwf::PyWindowUDF;
48-
use crate::utils::{get_global_ctx, spawn_stream, validate_pycapsule, wait_for_future};
48+
use crate::utils::{get_global_ctx, get_tokio_runtime, validate_pycapsule, wait_for_future};
4949
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
5050
use datafusion::arrow::pyarrow::PyArrowType;
5151
use datafusion::arrow::record_batch::RecordBatch;
@@ -66,13 +66,15 @@ use datafusion::execution::disk_manager::DiskManagerMode;
6666
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool};
6767
use datafusion::execution::options::ReadOptions;
6868
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
69+
use datafusion::physical_plan::SendableRecordBatchStream;
6970
use datafusion::prelude::{
7071
AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions,
7172
};
7273
use datafusion_ffi::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvider};
7374
use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
7475
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};
7576
use pyo3::IntoPyObjectExt;
77+
use tokio::task::JoinHandle;
7678

7779
/// Configuration options for a SessionContext
7880
#[pyclass(name = "SessionConfig", module = "datafusion", subclass)]
@@ -1130,8 +1132,12 @@ impl PySessionContext {
11301132
py: Python,
11311133
) -> PyDataFusionResult<PyRecordBatchStream> {
11321134
let ctx: TaskContext = TaskContext::from(&self.ctx.state());
1135+
// create a Tokio runtime to run the async code
1136+
let rt = &get_tokio_runtime().0;
11331137
let plan = plan.plan.clone();
1134-
let stream = spawn_stream(py, async move { plan.execute(part, Arc::new(ctx)) })?;
1138+
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
1139+
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
1140+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })???;
11351141
Ok(PyRecordBatchStream::new(stream))
11361142
}
11371143
}

0 commit comments

Comments
 (0)