Skip to content

Commit d17ca68

Browse files
committed
feat: enter Tokio runtime integration before scan for TableProvider
1 parent ae483d4 commit d17ca68

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

docs/source/user-guide/io/table_provider.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,8 @@ to the ``SessionContext``.
5656
ctx.register_table_provider("my_table", provider)
5757
5858
ctx.table("my_table").show()
59+
60+
Using ``wait_for_future`` ensures the embedded Tokio runtime is active when
61+
DataFusion invokes ``TableProvider::scan``. Your implementation of ``scan`` can
62+
safely call ``tokio::spawn_blocking`` by entering the runtime with
63+
``datafusion.enter_tokio_runtime()`` when required.

src/utils.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use pyo3::prelude::*;
2626
use pyo3::types::PyCapsule;
2727
use std::future::Future;
2828
use std::sync::OnceLock;
29-
use tokio::runtime::Runtime;
29+
use tokio::runtime::{EnterGuard, Runtime};
3030

3131
/// Utility to get the Tokio Runtime from Python
3232
#[inline]
@@ -47,14 +47,23 @@ pub(crate) fn get_global_ctx() -> &'static SessionContext {
4747
CTX.get_or_init(SessionContext::new)
4848
}
4949

50+
/// Enter the Tokio runtime and return the guard
51+
#[inline]
52+
pub(crate) fn enter_tokio_runtime() -> EnterGuard<'static> {
53+
get_tokio_runtime().0.enter()
54+
}
55+
5056
/// Utility to collect rust futures with GIL released
5157
pub fn wait_for_future<F>(py: Python, f: F) -> F::Output
5258
where
5359
F: Future + Send,
5460
F::Output: Send,
5561
{
5662
let runtime: &Runtime = &get_tokio_runtime().0;
57-
py.allow_threads(|| runtime.block_on(f))
63+
py.allow_threads(|| {
64+
let _guard = enter_tokio_runtime();
65+
runtime.block_on(f)
66+
})
5867
}
5968

6069
pub(crate) fn parse_volatility(value: &str) -> PyDataFusionResult<Volatility> {

0 commit comments

Comments
 (0)