Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl PyDatabase {
}

fn table(&self, name: &str, py: Python) -> PyDataFusionResult<PyTable> {
if let Some(table) = wait_for_future(py, self.database.table(name))? {
if let Some(table) = wait_for_future(py, self.database.table(name))?? {
Ok(PyTable::new(table))
} else {
Err(PyDataFusionError::Common(format!(
Expand Down
50 changes: 30 additions & 20 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl PySessionContext {
None => {
let state = self.ctx.state();
let schema = options.infer_schema(&state, &table_path);
wait_for_future(py, schema)?
wait_for_future(py, schema)??
}
};
let config = ListingTableConfig::new(table_path)
Expand All @@ -400,7 +400,7 @@ impl PySessionContext {
/// Returns a PyDataFrame whose plan corresponds to the SQL statement.
pub fn sql(&mut self, query: &str, py: Python) -> PyDataFusionResult<PyDataFrame> {
let result = self.ctx.sql(query);
let df = wait_for_future(py, result)?;
let df = wait_for_future(py, result)??;
Ok(PyDataFrame::new(df))
}

Expand All @@ -417,7 +417,7 @@ impl PySessionContext {
SQLOptions::new()
};
let result = self.ctx.sql_with_options(query, options);
let df = wait_for_future(py, result)?;
let df = wait_for_future(py, result)??;
Ok(PyDataFrame::new(df))
}

Expand Down Expand Up @@ -451,7 +451,7 @@ impl PySessionContext {

self.ctx.register_table(&*table_name, Arc::new(table))?;

let table = wait_for_future(py, self._table(&table_name))?;
let table = wait_for_future(py, self._table(&table_name))??;

let df = PyDataFrame::new(table);
Ok(df)
Expand Down Expand Up @@ -650,7 +650,7 @@ impl PySessionContext {
.collect();

let result = self.ctx.register_parquet(name, path, options);
wait_for_future(py, result)?;
wait_for_future(py, result)??;
Ok(())
}

Expand Down Expand Up @@ -693,11 +693,11 @@ impl PySessionContext {
if path.is_instance_of::<PyList>() {
let paths = path.extract::<Vec<String>>()?;
let result = self.register_csv_from_multiple_paths(name, paths, options);
wait_for_future(py, result)?;
wait_for_future(py, result)??;
} else {
let path = path.extract::<String>()?;
let result = self.ctx.register_csv(name, &path, options);
wait_for_future(py, result)?;
wait_for_future(py, result)??;
}

Ok(())
Expand Down Expand Up @@ -734,7 +734,7 @@ impl PySessionContext {
options.schema = schema.as_ref().map(|x| &x.0);

let result = self.ctx.register_json(name, path, options);
wait_for_future(py, result)?;
wait_for_future(py, result)??;

Ok(())
}
Expand Down Expand Up @@ -764,7 +764,7 @@ impl PySessionContext {
options.schema = schema.as_ref().map(|x| &x.0);

let result = self.ctx.register_avro(name, path, options);
wait_for_future(py, result)?;
wait_for_future(py, result)??;

Ok(())
}
Expand Down Expand Up @@ -825,9 +825,19 @@ impl PySessionContext {
}

pub fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
let x = wait_for_future(py, self.ctx.table(name))
let res = wait_for_future(py, self.ctx.table(name))
.map_err(|e| PyKeyError::new_err(e.to_string()))?;
Ok(PyDataFrame::new(x))
match res {
Ok(df) => Ok(PyDataFrame::new(df)),
Err(e) => {
if let datafusion::error::DataFusionError::Plan(msg) = &e {
if msg.contains("No table named") {
return Err(PyKeyError::new_err(msg.to_string()));
}
}
Err(py_datafusion_err(e))
}
}
}

pub fn table_exist(&self, name: &str) -> PyDataFusionResult<bool> {
Expand Down Expand Up @@ -865,10 +875,10 @@ impl PySessionContext {
let df = if let Some(schema) = schema {
options.schema = Some(&schema.0);
let result = self.ctx.read_json(path, options);
wait_for_future(py, result)?
wait_for_future(py, result)??
} else {
let result = self.ctx.read_json(path, options);
wait_for_future(py, result)?
wait_for_future(py, result)??
};
Ok(PyDataFrame::new(df))
}
Expand Down Expand Up @@ -915,12 +925,12 @@ impl PySessionContext {
let paths = path.extract::<Vec<String>>()?;
let paths = paths.iter().map(|p| p as &str).collect::<Vec<&str>>();
let result = self.ctx.read_csv(paths, options);
let df = PyDataFrame::new(wait_for_future(py, result)?);
let df = PyDataFrame::new(wait_for_future(py, result)??);
Ok(df)
} else {
let path = path.extract::<String>()?;
let result = self.ctx.read_csv(path, options);
let df = PyDataFrame::new(wait_for_future(py, result)?);
let df = PyDataFrame::new(wait_for_future(py, result)??);
Ok(df)
}
}
Expand Down Expand Up @@ -958,7 +968,7 @@ impl PySessionContext {
.collect();

let result = self.ctx.read_parquet(path, options);
let df = PyDataFrame::new(wait_for_future(py, result)?);
let df = PyDataFrame::new(wait_for_future(py, result)??);
Ok(df)
}

Expand All @@ -978,10 +988,10 @@ impl PySessionContext {
let df = if let Some(schema) = schema {
options.schema = Some(&schema.0);
let read_future = self.ctx.read_avro(path, options);
wait_for_future(py, read_future)?
wait_for_future(py, read_future)??
} else {
let read_future = self.ctx.read_avro(path, options);
wait_for_future(py, read_future)?
wait_for_future(py, read_future)??
};
Ok(PyDataFrame::new(df))
}
Expand Down Expand Up @@ -1021,8 +1031,8 @@ impl PySessionContext {
let plan = plan.plan.clone();
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
rt.spawn(async move { plan.execute(part, Arc::new(ctx)) });
let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
Ok(PyRecordBatchStream::new(stream?))
let stream = wait_for_future(py, async { fut.await.expect("Tokio task panicked") })??;
Ok(PyRecordBatchStream::new(stream))
}
}

Expand Down
40 changes: 18 additions & 22 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl PyDataFrame {
let (batches, has_more) = wait_for_future(
py,
collect_record_batches_to_display(self.df.as_ref().clone(), config),
)?;
)??;
if batches.is_empty() {
// This should not be reached, but do it for safety since we index into the vector below
return Ok("No data to display".to_string());
Expand All @@ -256,7 +256,7 @@ impl PyDataFrame {
let (batches, has_more) = wait_for_future(
py,
collect_record_batches_to_display(self.df.as_ref().clone(), config),
)?;
)??;
if batches.is_empty() {
// This should not be reached, but do it for safety since we index into the vector below
return Ok("No data to display".to_string());
Expand Down Expand Up @@ -288,7 +288,7 @@ impl PyDataFrame {
/// Calculate summary statistics for a DataFrame
fn describe(&self, py: Python) -> PyDataFusionResult<Self> {
let df = self.df.as_ref().clone();
let stat_df = wait_for_future(py, df.describe())?;
let stat_df = wait_for_future(py, df.describe())??;
Ok(Self::new(stat_df))
}

Expand Down Expand Up @@ -391,7 +391,7 @@ impl PyDataFrame {
/// Unless some order is specified in the plan, there is no
/// guarantee of the order of the result.
fn collect(&self, py: Python) -> PyResult<Vec<PyObject>> {
let batches = wait_for_future(py, self.df.as_ref().clone().collect())
let batches = wait_for_future(py, self.df.as_ref().clone().collect())?
.map_err(PyDataFusionError::from)?;
// cannot use PyResult<Vec<RecordBatch>> return type due to
// https://github.com/PyO3/pyo3/issues/1813
Expand All @@ -400,14 +400,14 @@ impl PyDataFrame {

/// Cache DataFrame.
fn cache(&self, py: Python) -> PyDataFusionResult<Self> {
let df = wait_for_future(py, self.df.as_ref().clone().cache())?;
let df = wait_for_future(py, self.df.as_ref().clone().cache())??;
Ok(Self::new(df))
}

/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
/// maintaining the input partitioning.
fn collect_partitioned(&self, py: Python) -> PyResult<Vec<Vec<PyObject>>> {
let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())
let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())?
.map_err(PyDataFusionError::from)?;

batches
Expand Down Expand Up @@ -511,7 +511,7 @@ impl PyDataFrame {

/// Get the execution plan for this `DataFrame`
fn execution_plan(&self, py: Python) -> PyDataFusionResult<PyExecutionPlan> {
let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())?;
let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())??;
Ok(plan.into())
}

Expand Down Expand Up @@ -624,7 +624,7 @@ impl PyDataFrame {
DataFrameWriteOptions::new(),
Some(csv_options),
),
)?;
)??;
Ok(())
}

Expand Down Expand Up @@ -685,7 +685,7 @@ impl PyDataFrame {
DataFrameWriteOptions::new(),
Option::from(options),
),
)?;
)??;
Ok(())
}

Expand All @@ -697,7 +697,7 @@ impl PyDataFrame {
.as_ref()
.clone()
.write_json(path, DataFrameWriteOptions::new(), None),
)?;
)??;
Ok(())
}

Expand All @@ -720,7 +720,7 @@ impl PyDataFrame {
py: Python<'py>,
requested_schema: Option<Bound<'py, PyCapsule>>,
) -> PyDataFusionResult<Bound<'py, PyCapsule>> {
let mut batches = wait_for_future(py, self.df.as_ref().clone().collect())?;
let mut batches = wait_for_future(py, self.df.as_ref().clone().collect())??;
let mut schema: Schema = self.df.schema().to_owned().into();

if let Some(schema_capsule) = requested_schema {
Expand Down Expand Up @@ -753,8 +753,8 @@ impl PyDataFrame {
let df = self.df.as_ref().clone();
let fut: JoinHandle<datafusion::common::Result<SendableRecordBatchStream>> =
rt.spawn(async move { df.execute_stream().await });
let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
Ok(PyRecordBatchStream::new(stream?))
let stream = wait_for_future(py, async { fut.await.expect("Tokio task panicked") })??;
Ok(PyRecordBatchStream::new(stream))
}

fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> {
Expand All @@ -763,14 +763,10 @@ impl PyDataFrame {
let df = self.df.as_ref().clone();
let fut: JoinHandle<datafusion::common::Result<Vec<SendableRecordBatchStream>>> =
rt.spawn(async move { df.execute_stream_partitioned().await });
let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
let stream = wait_for_future(py, async { fut.await.expect("Tokio task panicked") })?
.map_err(py_datafusion_err)?;

match stream {
Ok(batches) => Ok(batches.into_iter().map(PyRecordBatchStream::new).collect()),
_ => Err(PyValueError::new_err(
"Unable to execute stream partitioned",
)),
}
Ok(stream.into_iter().map(PyRecordBatchStream::new).collect())
}

/// Convert to pandas dataframe with pyarrow
Expand Down Expand Up @@ -815,7 +811,7 @@ impl PyDataFrame {

// Executes this DataFrame to get the total number of rows.
fn count(&self, py: Python) -> PyDataFusionResult<usize> {
Ok(wait_for_future(py, self.df.as_ref().clone().count())?)
Ok(wait_for_future(py, self.df.as_ref().clone().count())??)
}

/// Fill null values with a specified value for specific columns
Expand All @@ -841,7 +837,7 @@ impl PyDataFrame {
/// Print DataFrame
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
// Get string representation of record batches
let batches = wait_for_future(py, df.collect())?;
let batches = wait_for_future(py, df.collect())??;
let batches_as_string = pretty::pretty_format_batches(&batches);
let result = match batches_as_string {
Ok(batch) => format!("DataFrame()\n{batch}"),
Expand Down
2 changes: 1 addition & 1 deletion src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl PyRecordBatchStream {
impl PyRecordBatchStream {
fn next(&mut self, py: Python) -> PyResult<PyRecordBatch> {
let stream = self.stream.clone();
wait_for_future(py, next_stream(stream, true))
wait_for_future(py, next_stream(stream, true))?
}

fn __next__(&mut self, py: Python) -> PyResult<PyRecordBatch> {
Expand Down
11 changes: 6 additions & 5 deletions src/substrait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl PySubstraitSerializer {
path: &str,
py: Python,
) -> PyDataFusionResult<()> {
wait_for_future(py, serializer::serialize(sql, &ctx.ctx, path))?;
wait_for_future(py, serializer::serialize(sql, &ctx.ctx, path))??;
Ok(())
}

Expand All @@ -94,19 +94,20 @@ impl PySubstraitSerializer {
ctx: PySessionContext,
py: Python,
) -> PyDataFusionResult<PyObject> {
let proto_bytes: Vec<u8> = wait_for_future(py, serializer::serialize_bytes(sql, &ctx.ctx))?;
let proto_bytes: Vec<u8> =
wait_for_future(py, serializer::serialize_bytes(sql, &ctx.ctx))??;
Ok(PyBytes::new(py, &proto_bytes).into())
}

#[staticmethod]
pub fn deserialize(path: &str, py: Python) -> PyDataFusionResult<PyPlan> {
let plan = wait_for_future(py, serializer::deserialize(path))?;
let plan = wait_for_future(py, serializer::deserialize(path))??;
Ok(PyPlan { plan: *plan })
}

#[staticmethod]
pub fn deserialize_bytes(proto_bytes: Vec<u8>, py: Python) -> PyDataFusionResult<PyPlan> {
let plan = wait_for_future(py, serializer::deserialize_bytes(proto_bytes))?;
let plan = wait_for_future(py, serializer::deserialize_bytes(proto_bytes))??;
Ok(PyPlan { plan: *plan })
}
}
Expand Down Expand Up @@ -143,7 +144,7 @@ impl PySubstraitConsumer {
) -> PyDataFusionResult<PyLogicalPlan> {
let session_state = ctx.ctx.state();
let result = consumer::from_substrait_plan(&session_state, &plan.plan);
let logical_plan = wait_for_future(py, result)?;
let logical_plan = wait_for_future(py, result)??;
Ok(PyLogicalPlan::new(logical_plan))
}
}
Expand Down
Loading
Loading