1616// under the License.
1717
1818use std:: collections:: HashMap ;
19- use std:: ffi:: { c_void , CStr , CString } ;
19+ use std:: ffi:: { CStr , CString } ;
2020use std:: sync:: Arc ;
2121
2222use arrow:: array:: { new_null_array, RecordBatch , RecordBatchReader } ;
@@ -39,7 +39,6 @@ use datafusion::prelude::*;
3939use datafusion_ffi:: table_provider:: FFI_TableProvider ;
4040use futures:: { StreamExt , TryStreamExt } ;
4141use pyo3:: exceptions:: PyValueError ;
42- use pyo3:: ffi;
4342use pyo3:: prelude:: * ;
4443use pyo3:: pybacked:: PyBackedStr ;
4544use pyo3:: types:: { PyCapsule , PyList , PyTuple , PyTupleMethods } ;
@@ -967,25 +966,10 @@ impl PyDataFrame {
967966 } ;
968967 let reader: Box < dyn RecordBatchReader + Send > = Box :: new ( reader) ;
969968
970- let stream = Box :: new ( FFI_ArrowArrayStream :: new ( reader) ) ;
971- let stream_ptr = Box :: into_raw ( stream) ;
972- debug_assert ! (
973- !stream_ptr. is_null( ) ,
974- "ArrowArrayStream pointer should never be null" ,
975- ) ;
976- // The returned capsule allows zero-copy hand-off to PyArrow. Once
977- // PyArrow imports the capsule it assumes ownership of the stream, so we
978- // rely on PyArrow or PyO3 to manage the stream's lifetime.
979- let capsule = unsafe {
980- ffi:: PyCapsule_New ( stream_ptr as * mut c_void , ARROW_STREAM_NAME . as_ptr ( ) , None )
981- } ;
982- if capsule. is_null ( ) {
983- unsafe { drop ( Box :: from_raw ( stream_ptr) ) } ;
984- Err ( PyErr :: fetch ( py) . into ( ) )
985- } else {
986- let any = unsafe { Bound :: from_owned_ptr ( py, capsule) } ;
987- Ok ( any. downcast_into :: < PyCapsule > ( ) . unwrap ( ) )
988- }
969+ let stream = FFI_ArrowArrayStream :: new ( reader) ;
970+ // The returned capsule allows zero-copy hand-off to PyArrow.
971+ let capsule = PyCapsule :: new ( py, stream, Some ( ARROW_STREAM_NAME . into ( ) ) ) ?;
972+ Ok ( capsule)
989973 }
990974
991975 fn execute_stream ( & self , py : Python ) -> PyDataFusionResult < PyRecordBatchStream > {
0 commit comments