Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 16 additions & 44 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions temporalio/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
prost = "0.14"
pyo3 = { version = "0.25", features = [
pyo3 = { version = "0.29", features = [
"extension-module",
"abi3-py310",
"anyhow",
"multiple-pymethods",
] }
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"] }
pythonize = "0.25"
pyo3-async-runtimes = { version = "0.29", features = ["tokio-runtime"] }
pythonize = "0.29"
temporalio-client = { version = "0.4", path = "./sdk-core/crates/client" }
temporalio-common = { version = "0.4", path = "./sdk-core/crates/common", features = [
"envconfig", "otel"
Expand Down
2 changes: 1 addition & 1 deletion temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ where
match res {
Ok(resp) => Ok(resp.get_ref().encode_to_vec()),
Err(err) => {
Python::with_gil(move |py| {
Python::attach(move |py| {
// Create tuple of "status", "message", and optional "details"
let code = err.code() as u32;
let message = err.message().to_owned();
Expand Down
18 changes: 9 additions & 9 deletions temporalio/bridge/src/envconfig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use temporalio_common::envconfig::{

pyo3::create_exception!(temporal_sdk_bridge, ConfigError, PyRuntimeError);

fn data_source_to_dict(py: Python, ds: &DataSource) -> PyResult<PyObject> {
fn data_source_to_dict(py: Python, ds: &DataSource) -> PyResult<Py<PyAny>> {
let dict = PyDict::new(py);
match ds {
DataSource::Path(p) => dict.set_item("path", p)?,
Expand All @@ -23,7 +23,7 @@ fn data_source_to_dict(py: Python, ds: &DataSource) -> PyResult<PyObject> {
Ok(dict.into())
}

fn tls_to_dict(py: Python, tls: &CoreClientConfigTLS) -> PyResult<PyObject> {
fn tls_to_dict(py: Python, tls: &CoreClientConfigTLS) -> PyResult<Py<PyAny>> {
let dict = PyDict::new(py);
dict.set_item("disabled", tls.disabled)?;
if let Some(v) = &tls.client_cert {
Expand All @@ -42,7 +42,7 @@ fn tls_to_dict(py: Python, tls: &CoreClientConfigTLS) -> PyResult<PyObject> {
Ok(dict.into())
}

fn codec_to_dict(py: Python, codec: &ClientConfigCodec) -> PyResult<PyObject> {
fn codec_to_dict(py: Python, codec: &ClientConfigCodec) -> PyResult<Py<PyAny>> {
let dict = PyDict::new(py);
if let Some(v) = &codec.endpoint {
dict.set_item("endpoint", v)?;
Expand All @@ -53,7 +53,7 @@ fn codec_to_dict(py: Python, codec: &ClientConfigCodec) -> PyResult<PyObject> {
Ok(dict.into())
}

fn profile_to_dict(py: Python, profile: &CoreClientConfigProfile) -> PyResult<PyObject> {
fn profile_to_dict(py: Python, profile: &CoreClientConfigProfile) -> PyResult<Py<PyAny>> {
let dict = PyDict::new(py);
if let Some(v) = &profile.address {
dict.set_item("address", v)?;
Expand All @@ -76,7 +76,7 @@ fn profile_to_dict(py: Python, profile: &CoreClientConfigProfile) -> PyResult<Py
Ok(dict.into())
}

fn core_config_to_dict(py: Python, core_config: &CoreClientConfig) -> PyResult<PyObject> {
fn core_config_to_dict(py: Python, core_config: &CoreClientConfig) -> PyResult<Py<PyAny>> {
let profiles_dict = PyDict::new(py);
for (name, profile) in &core_config.profiles {
let connect_dict = profile_to_dict(py, profile)?;
Expand All @@ -90,7 +90,7 @@ fn load_client_config_inner(
config_source: Option<DataSource>,
config_file_strict: bool,
env_vars: Option<HashMap<String, String>>,
) -> PyResult<PyObject> {
) -> PyResult<Py<PyAny>> {
let options = LoadClientConfigOptions {
config_source,
config_file_strict,
Expand All @@ -109,7 +109,7 @@ fn load_client_connect_config_inner(
disable_env: bool,
config_file_strict: bool,
env_vars: Option<HashMap<String, String>>,
) -> PyResult<PyObject> {
) -> PyResult<Py<PyAny>> {
let options = LoadClientConfigProfileOptions {
config_source,
config_file_profile: profile,
Expand All @@ -132,7 +132,7 @@ pub fn load_client_config(
data: Option<Vec<u8>>,
config_file_strict: bool,
env_vars: Option<HashMap<String, String>>,
) -> PyResult<PyObject> {
) -> PyResult<Py<PyAny>> {
let config_source = match (path, data) {
(Some(p), None) => Some(DataSource::Path(p)),
(None, Some(d)) => Some(DataSource::Data(d)),
Expand All @@ -158,7 +158,7 @@ pub fn load_client_connect_config(
disable_env: bool,
config_file_strict: bool,
env_vars: Option<HashMap<String, String>>,
) -> PyResult<PyObject> {
) -> PyResult<Py<PyAny>> {
let config_source = match (path, data) {
(Some(p), None) => Some(DataSource::Path(p)),
(None, Some(d)) => Some(DataSource::Data(d)),
Expand Down
6 changes: 3 additions & 3 deletions temporalio/bridge/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct MetricMeterRef {
default_attributes: MetricAttributesRef,
}

#[pyclass]
#[pyclass(from_py_object)]
#[derive(Clone)]
pub struct MetricAttributesRef {
attrs: metrics::MetricAttributes,
Expand Down Expand Up @@ -216,7 +216,7 @@ impl MetricAttributesRef {
&self,
py: Python,
meter: &MetricMeterRef,
new_attrs: HashMap<String, PyObject>,
new_attrs: HashMap<String, Py<PyAny>>,
) -> PyResult<Self> {
let attrs = meter.meter.extend_attributes(
self.attrs.clone(),
Expand All @@ -234,7 +234,7 @@ impl MetricAttributesRef {
fn metric_key_value_from_py(
py: Python,
k: String,
obj: PyObject,
obj: Py<PyAny>,
) -> PyResult<metrics::MetricKeyValue> {
let val = if let Ok(v) = obj.extract::<String>(py) {
metrics::MetricValue::String(v)
Expand Down
22 changes: 13 additions & 9 deletions temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct TelemetryConfig {
#[derive(FromPyObject)]
pub struct LoggingConfig {
filter: String,
forward_to: Option<PyObject>,
forward_to: Option<Py<PyAny>>,
}

#[pyclass]
Expand Down Expand Up @@ -105,7 +105,7 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult<RuntimeRef> {
let telemetry_build = TelemetryOptions::builder();

// Build logging config, capturing forwarding info to start later
let mut log_forwarding: Option<(Receiver<CoreLog>, PyObject)> = None;
let mut log_forwarding: Option<(Receiver<CoreLog>, Py<PyAny>)> = None;
let maybe_logging = if let Some(logging_conf) = logging {
Some(if let Some(forward_to) = logging_conf.forward_to {
// Note, actual log forwarding is started later
Expand Down Expand Up @@ -177,7 +177,7 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult<RuntimeRef> {
.collect::<Vec<_>>();
// We silently swallow errors here because logging them could
// cause a bad loop and we don't want to assume console presence
let _ = Python::with_gil(|py| callback.call1(py, (entries,)));
let _ = Python::attach(|py| callback.call1(py, (entries,)));
}
}))
});
Expand All @@ -204,7 +204,7 @@ impl Runtime {
pub fn future_into_py<'a, F, T>(&self, py: Python<'a>, fut: F) -> PyResult<Bound<'a, PyAny>>
where
F: Future<Output = PyResult<T>> + Send + 'static,
T: for<'py> IntoPyObject<'py>,
T: for<'py> IntoPyObject<'py> + Send + 'static,
{
let _guard = self.core.tokio_handle().enter();
pyo3_async_runtimes::generic::future_into_py::<TokioRuntime, _, T>(py, fut)
Expand Down Expand Up @@ -310,7 +310,7 @@ impl BufferedLogEntry {
}

#[getter]
fn fields(&self, py: Python<'_>) -> PyResult<HashMap<&str, PyObject>> {
fn fields(&self, py: Python<'_>) -> PyResult<HashMap<&str, Py<PyAny>>> {
self.core_log
.fields
.iter()
Expand Down Expand Up @@ -413,6 +413,13 @@ impl pyo3_async_runtimes::generic::Runtime for TokioRuntime {
{
tokio::runtime::Handle::current().spawn(fut)
}

fn spawn_blocking<F>(f: F) -> Self::JoinHandle
where
F: FnOnce() + Send + 'static,
{
tokio::runtime::Handle::current().spawn_blocking(f)
}
}

impl pyo3_async_runtimes::generic::ContextExt for TokioRuntime {
Expand All @@ -431,10 +438,7 @@ impl pyo3_async_runtimes::generic::ContextExt for TokioRuntime {

fn get_task_locals() -> Option<pyo3_async_runtimes::TaskLocals> {
TASK_LOCALS
.try_with(|c| {
c.get()
.map(|locals| Python::with_gil(|py| locals.clone_ref(py)))
})
.try_with(|c| c.get().cloned())
.unwrap_or_default()
}
}
Loading