Skip to content

Commit 010b4c6

Browse files
committed
Revert "refactor: add helper for async move"
This reverts commit faabf6d.
1 parent faabf6d commit 010b4c6

File tree

3 files changed

+21
-206
lines changed

3 files changed

+21
-206
lines changed

src/context.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ use crate::udaf::PyAggregateUDF;
4545
use crate::udf::PyScalarUDF;
4646
use crate::udtf::PyTableFunction;
4747
use crate::udwf::PyWindowUDF;
48-
use crate::utils::{
49-
call_async, call_datafusion_async, call_datafusion_async_double_question, get_global_ctx,
50-
get_tokio_runtime, validate_pycapsule, wait_for_future,
51-
};
48+
use crate::utils::{get_global_ctx, get_tokio_runtime, validate_pycapsule, wait_for_future};
5249
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
5350
use datafusion::arrow::pyarrow::PyArrowType;
5451
use datafusion::arrow::record_batch::RecordBatch;
@@ -428,8 +425,10 @@ impl PySessionContext {
428425
let ctx = self.ctx.clone();
429426
let query_owned = query.to_string();
430427

431-
// Use the helper function to handle async execution
432-
let df = call_datafusion_async(py, async move { ctx.sql(&query_owned).await })?;
428+
// Create a future that moves owned values
429+
let result_future = async move { ctx.sql(&query_owned).await };
430+
431+
let df = wait_for_future(py, result_future)?.map_err(PyDataFusionError::from)?;
433432
Ok(PyDataFrame::new(df))
434433
}
435434

@@ -451,11 +450,10 @@ impl PySessionContext {
451450
let ctx = self.ctx.clone();
452451
let query_owned = query.to_string();
453452

454-
// Use the helper function to handle async execution
455-
let df = call_datafusion_async(py, async move {
456-
ctx.sql_with_options(&query_owned, sql_options).await
457-
})?;
453+
// Create a future that moves owned values
454+
let result_future = async move { ctx.sql_with_options(&query_owned, sql_options).await };
458455

456+
let df = wait_for_future(py, result_future)?.map_err(PyDataFusionError::from)?;
459457
Ok(PyDataFrame::new(df))
460458
}
461459

@@ -720,7 +718,7 @@ impl PySessionContext {
720718
ctx.register_parquet(&name_owned, &path_owned, options)
721719
.await
722720
};
723-
let _ = wait_for_future(py, result_future)??;
721+
let _ = wait_for_future(py, result_future)?;
724722
Ok(())
725723
}
726724

@@ -1002,13 +1000,13 @@ impl PySessionContext {
10021000
let ctx = self.ctx.clone();
10031001
let name_owned = name.to_string();
10041002

1005-
// Use call_async since we need custom error mapping
1006-
let df = call_async(py, async move { ctx.table(&name_owned).await }, |e| {
1007-
PyKeyError::new_err(e.to_string()).into()
1008-
})?
1009-
.map_err(|e| PyKeyError::new_err(e.to_string()))?;
1003+
// Create a future that moves owned values
1004+
let table_future = async move { ctx.table(&name_owned).await };
10101005

1011-
Ok(PyDataFrame::new(df))
1006+
let x = wait_for_future(py, table_future)
1007+
.map_err(|e| PyKeyError::new_err(e.to_string()))?
1008+
.map_err(|e| PyKeyError::new_err(e.to_string()))?;
1009+
Ok(PyDataFrame::new(x))
10121010
}
10131011

10141012
pub fn execute(
@@ -1269,8 +1267,8 @@ impl PySessionContext {
12691267
// Convert table partition columns
12701268
let table_partition_cols = convert_table_partition_cols(table_partition_cols)?;
12711269

1272-
// Use the helper function to handle async execution
1273-
let df = call_datafusion_async(py, async move {
1270+
// Create a future that moves owned values
1271+
let result_future = async move {
12741272
let mut options = ParquetReadOptions::default()
12751273
.table_partition_cols(table_partition_cols)
12761274
.parquet_pruning(parquet_pruning)
@@ -1285,9 +1283,11 @@ impl PySessionContext {
12851283
options.file_sort_order = file_sort_order_converted;
12861284

12871285
ctx.read_parquet(&path_owned, options).await
1288-
})?;
1286+
};
12891287

1290-
Ok(PyDataFrame::new(df))
1288+
let df =
1289+
PyDataFrame::new(wait_for_future(py, result_future)?.map_err(PyDataFusionError::from)?);
1290+
Ok(df)
12911291
}
12921292

12931293
#[allow(clippy::too_many_arguments)]

src/utils.rs

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -100,87 +100,6 @@ where
100100
})
101101
}
102102

103-
/// Helper function to execute an async function and wait for the result.
104-
///
105-
/// This function takes a Python GIL token, an async block as a future,
106-
/// and a function to convert the error type. It executes the future and
107-
/// returns the unwrapped result.
108-
///
109-
/// # Arguments
110-
///
111-
/// * `py` - Python GIL token
112-
/// * `future` - Future to execute
113-
/// * `error_mapper` - Function to convert error type
114-
///
115-
/// # Returns
116-
///
117-
/// Result with the unwrapped value or converted error
118-
pub fn call_async<F, T, E, M>(
119-
py: Python,
120-
future: F,
121-
error_mapper: M,
122-
) -> Result<T, PyDataFusionError>
123-
where
124-
F: Future<Output = Result<T, E>> + Send + 'static,
125-
T: Send + 'static,
126-
E: std::error::Error + Send + 'static,
127-
M: Fn(E) -> PyDataFusionError,
128-
{
129-
wait_for_future(py, future)?.map_err(error_mapper)
130-
}
131-
132-
/// Helper function to execute an async function that returns a Result with DataFusionError
133-
/// and wait for the result.
134-
///
135-
/// This is a specialized version of call_async that maps DataFusionError to PyDataFusionError
136-
/// automatically.
137-
///
138-
/// # Arguments
139-
///
140-
/// * `py` - Python GIL token
141-
/// * `future` - Future to execute
142-
///
143-
/// # Returns
144-
///
145-
/// Result with the unwrapped value or PyDataFusionError
146-
pub fn call_datafusion_async<F, T>(py: Python, future: F) -> Result<T, PyDataFusionError>
147-
where
148-
F: Future<Output = Result<T, datafusion::error::DataFusionError>> + Send + 'static,
149-
T: Send + 'static,
150-
{
151-
call_async(py, future, PyDataFusionError::from)
152-
}
153-
154-
/// Helper function to execute a DataFusion async function and apply ? twice to unwrap the result.
155-
///
156-
/// This helper is useful for functions that need to apply ?? to the result of wait_for_future.
157-
///
158-
/// # Arguments
159-
///
160-
/// * `py` - Python GIL token
161-
/// * `future` - Future to execute
162-
///
163-
/// # Returns
164-
///
165-
/// Result with the unwrapped value or PyDataFusionError
166-
pub fn call_datafusion_async_double_question<F, T>(
167-
py: Python,
168-
future: F,
169-
) -> Result<(), PyDataFusionError>
170-
where
171-
F: Future<
172-
Output = Result<
173-
Result<T, datafusion::error::DataFusionError>,
174-
datafusion::error::DataFusionError,
175-
>,
176-
> + Send
177-
+ 'static,
178-
T: Send + 'static,
179-
{
180-
let _ = wait_for_future(py, future)??;
181-
Ok(())
182-
}
183-
184103
pub(crate) fn parse_volatility(value: &str) -> PyDataFusionResult<Volatility> {
185104
Ok(match value {
186105
"immutable" => Volatility::Immutable,

src/utils/async_utils.rs

Lines changed: 0 additions & 104 deletions
This file was deleted.

0 commit comments

Comments
 (0)