Skip to content

Commit b81cdc2

Browse files
committed
refactor: simplify stream capsule creation in PyDataFrame
1 parent dd1701f commit b81cdc2

File tree

1 file changed

+4
-54
lines changed

1 file changed

+4
-54
lines changed

src/dataframe.rs

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19-
use std::ffi::{c_void, CStr, CString};
19+
use std::ffi::CString;
2020
use std::sync::Arc;
2121

2222
use arrow::array::{new_null_array, RecordBatch, RecordBatchReader};
@@ -39,7 +39,6 @@ use datafusion::prelude::*;
3939
use datafusion_ffi::table_provider::FFI_TableProvider;
4040
use futures::{StreamExt, TryStreamExt};
4141
use pyo3::exceptions::PyValueError;
42-
use pyo3::ffi;
4342
use pyo3::prelude::*;
4443
use pyo3::pybacked::PyBackedStr;
4544
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
@@ -59,35 +58,6 @@ use crate::{
5958
expr::{sort_expr::PySortExpr, PyExpr},
6059
};
6160

62-
#[allow(clippy::manual_c_str_literals)]
63-
static ARROW_STREAM_NAME: &CStr =
64-
unsafe { CStr::from_bytes_with_nul_unchecked(b"arrow_array_stream\0") };
65-
66-
unsafe extern "C" fn drop_stream(capsule: *mut ffi::PyObject) {
67-
if capsule.is_null() {
68-
return;
69-
}
70-
71-
// When PyArrow imports this capsule it steals the raw stream pointer and
72-
// sets the capsule's internal pointer to NULL. In that case
73-
// `PyCapsule_IsValid` returns 0 and this destructor must not drop the
74-
// stream as ownership has been transferred to PyArrow. If the capsule was
75-
// never imported, the pointer remains valid and we are responsible for
76-
// freeing the stream here.
77-
if ffi::PyCapsule_IsValid(capsule, ARROW_STREAM_NAME.as_ptr()) == 1 {
78-
let stream_ptr = ffi::PyCapsule_GetPointer(capsule, ARROW_STREAM_NAME.as_ptr())
79-
as *mut FFI_ArrowArrayStream;
80-
if !stream_ptr.is_null() {
81-
drop(Box::from_raw(stream_ptr));
82-
}
83-
}
84-
85-
// `PyCapsule_GetPointer` sets a Python error on failure. Clear it only
86-
// after the stream has been released (or determined to be owned
87-
// elsewhere).
88-
ffi::PyErr_Clear();
89-
}
90-
9161
// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
9262
// - we have not decided on the table_provider approach yet
9363
// this is an interim implementation
@@ -993,29 +963,9 @@ impl PyDataFrame {
993963
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
994964

995965
let stream = Box::new(FFI_ArrowArrayStream::new(reader));
996-
let stream_ptr = Box::into_raw(stream);
997-
debug_assert!(
998-
!stream_ptr.is_null(),
999-
"ArrowArrayStream pointer should never be null",
1000-
);
1001-
// The returned capsule allows zero-copy hand-off to PyArrow. When
1002-
// PyArrow imports the capsule it assumes ownership of the stream and
1003-
// nulls out the capsule's internal pointer so `drop_stream` knows not to
1004-
// free it.
1005-
let capsule = unsafe {
1006-
ffi::PyCapsule_New(
1007-
stream_ptr as *mut c_void,
1008-
ARROW_STREAM_NAME.as_ptr(),
1009-
Some(drop_stream),
1010-
)
1011-
};
1012-
if capsule.is_null() {
1013-
unsafe { drop(Box::from_raw(stream_ptr)) };
1014-
Err(PyErr::fetch(py).into())
1015-
} else {
1016-
let any = unsafe { Bound::from_owned_ptr(py, capsule) };
1017-
Ok(any.downcast_into::<PyCapsule>().unwrap())
1018-
}
966+
967+
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
968+
Ok(PyCapsule::new(py, stream, Some(stream_capsule_name))?)
1019969
}
1020970

1021971
fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> {

0 commit comments

Comments
 (0)