Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,10 +683,12 @@ impl WorkerRef {
.map_err(|err| PyValueError::new_err(format!("Failed replacing client: {err}")))
}

fn initiate_shutdown(&self) -> PyResult<()> {
fn initiate_shutdown<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let worker = self.worker.as_ref().unwrap().clone();
worker.initiate_shutdown();
Ok(())
self.runtime.future_into_py(py, async move {
worker.initiate_shutdown().await;
Ok(())
})
}

fn finalize_shutdown<'p>(&mut self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
Expand Down
4 changes: 2 additions & 2 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ def replace_client(self, client: temporalio.bridge.client.Client) -> None:
"""Replace the worker client."""
self._ref.replace_client(client._ref) # type: ignore[reportOptionalMemberAccess]

def initiate_shutdown(self) -> None:
async def initiate_shutdown(self) -> None:
"""Start shutdown of the worker."""
self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess]
await self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess]

async def finalize_shutdown(self) -> None:
"""Finalize the worker.
Expand Down
2 changes: 1 addition & 1 deletion temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]:
# We must shutdown here
try:
if bridge_worker_scope is not None:
bridge_worker_scope.initiate_shutdown()
await bridge_worker_scope.initiate_shutdown()
await bridge_worker_scope.finalize_shutdown()
except Exception:
logger.warning("Failed to finalize shutdown", exc_info=True)
Expand Down
2 changes: 1 addition & 1 deletion temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ async def raise_on_shutdown():
)

# Initiate core worker shutdown
self._bridge_worker.initiate_shutdown()
await self._bridge_worker.initiate_shutdown()

# If any worker task had an exception, replace that task with a queue drain
for worker, task in tasks.items():
Expand Down
2 changes: 1 addition & 1 deletion temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ async def _handle_cache_eviction(
except Exception as e:
self._throw_after_activation = e
logger.debug("Shutting down worker on eviction hook exception")
self._bridge_worker().initiate_shutdown()
await self._bridge_worker().initiate_shutdown()

def _create_workflow_instance(
self,
Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DEV_SERVER_DOWNLOAD_VERSION = "v1.6.1-server-1.31.0-151.0"
DEV_SERVER_DOWNLOAD_VERSION = "v1.6.2-server-1.31.0-151.6"
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
"history.enableChasm=true",
"--dynamic-config-value",
"history.enableTransitionHistory=true",
"--dynamic-config-value",
"frontend.enableCancelWorkerPollsOnShutdown=true",
],
dev_server_download_version=DEV_SERVER_DOWNLOAD_VERSION,
)
Expand Down
Loading