Skip to content

Commit 36e01fe

Browse files
committed
Revert "revert branch UNPICK"
This reverts commit 8a59421.
1 parent 8a59421 commit 36e01fe

File tree

3 files changed

+98
-12
lines changed

3 files changed

+98
-12
lines changed

docs/source/user-guide/dataframe/index.rst

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,31 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152+
PyArrow Streaming
153+
-----------------
154+
155+
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156+
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157+
Earlier versions eagerly converted the entire DataFrame when exporting to
158+
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159+
are produced lazily so you can process arbitrarily large results without
160+
out-of-memory errors.
161+
162+
.. code-block:: python
163+
164+
import pyarrow as pa
165+
166+
# Create a PyArrow RecordBatchReader without materializing all batches
167+
reader = pa.RecordBatchReader._import_from_c(df.__arrow_c_stream__())
168+
for batch in reader:
169+
... # process each batch as it is produced
170+
171+
See :doc:`../io/arrow` for additional details on the Arrow interface.
172+
152173
HTML Rendering
153174
--------------
154175

python/tests/test_dataframe.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,6 +1582,29 @@ def test_empty_to_arrow_table(df):
15821582
assert set(pyarrow_table.column_names) == {"a", "b", "c"}
15831583

15841584

1585+
def test_arrow_c_stream_to_table(monkeypatch):
1586+
ctx = SessionContext()
1587+
1588+
# Create a DataFrame with two separate record batches
1589+
batch1 = pa.record_batch([pa.array([1])], names=["a"])
1590+
batch2 = pa.record_batch([pa.array([2])], names=["a"])
1591+
df = ctx.create_dataframe([[batch1], [batch2]])
1592+
1593+
# Fail if the DataFrame is pre-collected
1594+
def fail_collect(self): # pragma: no cover - failure path
1595+
msg = "collect should not be called"
1596+
raise AssertionError(msg)
1597+
1598+
monkeypatch.setattr(DataFrame, "collect", fail_collect)
1599+
1600+
table = pa.Table.from_batches(df)
1601+
expected = pa.Table.from_batches([batch1, batch2])
1602+
1603+
assert table.equals(expected)
1604+
assert table.schema == df.schema()
1605+
assert table.column("a").num_chunks == 2
1606+
1607+
15851608
def test_to_pylist(df):
15861609
# Convert datafusion dataframe to Python list
15871610
pylist = df.to_pylist()

src/dataframe.rs

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ use std::collections::HashMap;
1919
use std::ffi::CString;
2020
use std::sync::Arc;
2121

22-
use arrow::array::{new_null_array, RecordBatch, RecordBatchIterator, RecordBatchReader};
22+
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader};
2323
use arrow::compute::can_cast_types;
2424
use arrow::error::ArrowError;
2525
use arrow::ffi::FFI_ArrowSchema;
2626
use arrow::ffi_stream::FFI_ArrowArrayStream;
2727
use arrow::pyarrow::FromPyArrow;
28-
use datafusion::arrow::datatypes::Schema;
28+
use datafusion::arrow::datatypes::{Schema, SchemaRef};
2929
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
3030
use datafusion::arrow::util::pretty;
3131
use datafusion::common::UnnestOptions;
@@ -42,7 +42,7 @@ use pyo3::exceptions::PyValueError;
4242
use pyo3::prelude::*;
4343
use pyo3::pybacked::PyBackedStr;
4444
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
45-
use tokio::task::JoinHandle;
45+
use tokio::{runtime::Handle, task::JoinHandle};
4646

4747
use crate::catalog::PyTable;
4848
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError};
@@ -354,6 +354,41 @@ impl PyDataFrame {
354354
}
355355
}
356356

357+
struct DataFrameStreamReader {
358+
stream: SendableRecordBatchStream,
359+
runtime: Handle,
360+
schema: SchemaRef,
361+
projection: Option<SchemaRef>,
362+
}
363+
364+
impl Iterator for DataFrameStreamReader {
365+
type Item = Result<RecordBatch, ArrowError>;
366+
367+
fn next(&mut self) -> Option<Self::Item> {
368+
match self.runtime.block_on(self.stream.next()) {
369+
Some(Ok(batch)) => {
370+
let batch = if let Some(ref schema) = self.projection {
371+
match record_batch_into_schema(batch, schema.as_ref()) {
372+
Ok(b) => b,
373+
Err(e) => return Some(Err(e)),
374+
}
375+
} else {
376+
batch
377+
};
378+
Some(Ok(batch))
379+
}
380+
Some(Err(e)) => Some(Err(ArrowError::ExternalError(Box::new(e)))),
381+
None => None,
382+
}
383+
}
384+
}
385+
386+
impl RecordBatchReader for DataFrameStreamReader {
387+
fn schema(&self) -> SchemaRef {
388+
self.schema.clone()
389+
}
390+
}
391+
357392
#[pymethods]
358393
impl PyDataFrame {
359394
/// Enable selection for `df[col]`, `df[col1, col2, col3]`, and `df[[col1, col2, col3]]`
@@ -879,8 +914,14 @@ impl PyDataFrame {
879914
py: Python<'py>,
880915
requested_schema: Option<Bound<'py, PyCapsule>>,
881916
) -> PyDataFusionResult<Bound<'py, PyCapsule>> {
882-
let mut batches = wait_for_future(py, self.df.as_ref().clone().collect())??;
917+
let rt = &get_tokio_runtime().0;
918+
let df = self.df.as_ref().clone();
919+
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
920+
rt.spawn(async move { df.execute_stream().await });
921+
let stream = wait_for_future(py, async { fut.await.map_err(to_datafusion_err) })???;
922+
883923
let mut schema: Schema = self.df.schema().to_owned().into();
924+
let mut projection: Option<SchemaRef> = None;
884925

885926
if let Some(schema_capsule) = requested_schema {
886927
validate_pycapsule(&schema_capsule, "arrow_schema")?;
@@ -889,16 +930,17 @@ impl PyDataFrame {
889930
let desired_schema = Schema::try_from(schema_ptr)?;
890931

891932
schema = project_schema(schema, desired_schema)?;
892-
893-
batches = batches
894-
.into_iter()
895-
.map(|record_batch| record_batch_into_schema(record_batch, &schema))
896-
.collect::<Result<Vec<RecordBatch>, ArrowError>>()?;
933+
projection = Some(Arc::new(schema.clone()));
897934
}
898935

899-
let batches_wrapped = batches.into_iter().map(Ok);
936+
let schema_ref = projection.clone().unwrap_or_else(|| Arc::new(schema));
900937

901-
let reader = RecordBatchIterator::new(batches_wrapped, Arc::new(schema));
938+
let reader = DataFrameStreamReader {
939+
stream,
940+
runtime: rt.handle().clone(),
941+
schema: schema_ref,
942+
projection,
943+
};
902944
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
903945

904946
let ffi_stream = FFI_ArrowArrayStream::new(reader);

0 commit comments

Comments
 (0)