1616// under the License.
1717
1818use std:: collections:: HashMap ;
19- use std:: ffi:: { CStr , CString } ;
20- use std:: os:: raw:: c_void;
19+ use std:: ffi:: CString ;
2120use std:: sync:: Arc ;
2221
2322use arrow:: array:: { new_null_array, RecordBatch , RecordBatchReader } ;
@@ -40,7 +39,6 @@ use datafusion::prelude::*;
4039use datafusion_ffi:: table_provider:: FFI_TableProvider ;
4140use futures:: { StreamExt , TryStreamExt } ;
4241use pyo3:: exceptions:: PyValueError ;
43- use pyo3:: ffi;
4442use pyo3:: prelude:: * ;
4543use pyo3:: pybacked:: PyBackedStr ;
4644use pyo3:: types:: { PyCapsule , PyList , PyTuple , PyTupleMethods } ;
@@ -60,29 +58,6 @@ use crate::{
6058 expr:: { sort_expr:: PySortExpr , PyExpr } ,
6159} ;
6260
63- #[ allow( clippy:: manual_c_str_literals) ]
64- static ARROW_STREAM_NAME : & CStr =
65- unsafe { CStr :: from_bytes_with_nul_unchecked ( b"arrow_array_stream\0 " ) } ;
66-
67- /// Capsule destructor for [`FFI_ArrowArrayStream`].
68- ///
69- /// PyArrow currently takes ownership of the stream and nulls out the capsule's
70- /// pointer when importing. This destructor defensively checks for a null pointer
71- /// to guard against double free until PyArrow is removed.
72- unsafe extern "C" fn drop_arrow_array_stream_capsule ( capsule : * mut ffi:: PyObject ) {
73- // Safety: `capsule` is expected to be a valid `PyCapsule` containing a
74- // `FFI_ArrowArrayStream` created by this module.
75- let ptr = unsafe { ffi:: PyCapsule_GetPointer ( capsule, ARROW_STREAM_NAME . as_ptr ( ) ) } ;
76- if ptr. is_null ( ) {
77- return ;
78- }
79- // Reconstruct and drop the stream then null out the capsule pointer.
80- unsafe {
81- drop ( Box :: from_raw ( ptr. cast :: < FFI_ArrowArrayStream > ( ) ) ) ;
82- ffi:: PyCapsule_SetPointer ( capsule, std:: ptr:: null_mut ( ) ) ;
83- }
84- }
85-
8661// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
8762// - we have not decided on the table_provider approach yet
8863// this is an interim implementation
@@ -988,25 +963,9 @@ impl PyDataFrame {
988963 let reader: Box < dyn RecordBatchReader + Send > = Box :: new ( reader) ;
989964
990965 let stream = FFI_ArrowArrayStream :: new ( reader) ;
991- // This custom destructor is defensive to avoid a double free. It is
992- // a temporary workaround until DataFusion's Python bindings decouple
993- // from PyArrow.
994- // The returned capsule allows zero-copy hand-off to PyArrow. When
995- // PyArrow imports the capsule it assumes ownership of the stream and
996- // nulls out the capsule's internal pointer so the destructor does not
997- // free it twice.
998- unsafe {
999- let capsule = ffi:: PyCapsule_New (
1000- Box :: into_raw ( Box :: new ( stream) ) as * mut c_void ,
1001- ARROW_STREAM_NAME . as_ptr ( ) ,
1002- Some ( drop_arrow_array_stream_capsule) ,
1003- ) ;
1004- if capsule. is_null ( ) {
1005- Err ( PyErr :: fetch ( py) . into ( ) )
1006- } else {
1007- Ok ( Bound :: from_owned_ptr ( py, capsule) . downcast_into_unchecked ( ) )
1008- }
1009- }
966+ let name = CString :: new ( "arrow_array_stream" ) . unwrap ( ) ;
967+ let capsule = PyCapsule :: new ( py, stream, Some ( name) ) . map_err ( py_datafusion_err) ?;
968+ Ok ( capsule)
1010969 }
1011970
1012971 fn execute_stream ( & self , py : Python ) -> PyDataFusionResult < PyRecordBatchStream > {
0 commit comments