From c8ba007c58c1d223094709ebb6ee0df787d36e62 Mon Sep 17 00:00:00 2001 From: Robbie McKinstry Date: Thu, 12 Jun 2025 04:44:00 -0400 Subject: [PATCH 1/3] Switch shutdown order from all-at-once to depth-first. This commit changes the shutdown order of the subsystems. Previously, when Ctrl-C was received, all subsystems would simultaniously receive the shutdown signal. Now, all parent subsystems receive the shutdown signal first, then propagate the signal to their children. They also wait for their children to shutdown before shutting down themselves. This will allow us to control the order in which shutdown occurs. For example, we can wait for channels to drain before closing them, or differentiate between Ctrl-C shutdowns and error shutdowns. --- src/cmd/run.rs | 2 +- src/subsystems/controller/mod.rs | 37 ++++++++++++++++------------ src/subsystems/controller/monitor.rs | 9 ++++--- src/subsystems/ingress/mod.rs | 2 ++ src/subsystems/monitor/mod.rs | 2 ++ src/subsystems/platform/mod.rs | 2 ++ src/subsystems/relay/lock_mgmt.rs | 2 ++ src/subsystems/relay/mod.rs | 8 +++--- src/subsystems/relay/poll_state.rs | 2 ++ 9 files changed, 40 insertions(+), 26 deletions(-) diff --git a/src/cmd/run.rs b/src/cmd/run.rs index eb55b07..4938c2c 100644 --- a/src/cmd/run.rs +++ b/src/cmd/run.rs @@ -178,7 +178,7 @@ impl Run { s.start(SubsystemBuilder::new( CONTROLLER_SUBSYSTEM_NAME, controller.into_subsystem(), - )); + ).detached()); }) .catch_signals() .handle_shutdown_requests(Duration::from_millis(DEFAULT_SHUTDOWN_TIMEOUT)) diff --git a/src/subsystems/controller/mod.rs b/src/subsystems/controller/mod.rs index 48d1ea4..261cee8 100644 --- a/src/subsystems/controller/mod.rs +++ b/src/subsystems/controller/mod.rs @@ -79,29 +79,34 @@ impl IntoSubsystem for ControllerSubsystem { .build(); // • Start the ingress subsystem. - subsys.start(SubsystemBuilder::new( - INGRESS_SUBSYSTEM_NAME, - ingress_subsystem.into_subsystem(), - )); + subsys.start( + SubsystemBuilder::new(INGRESS_SUBSYSTEM_NAME, ingress_subsystem.into_subsystem()) + .detached(), + ); // • Start the platform subsystem. - subsys.start(SubsystemBuilder::new( - PLATFORM_SUBSYSTEM_NAME, - platform_subsystem.into_subsystem(), - )); + subsys.start( + SubsystemBuilder::new(PLATFORM_SUBSYSTEM_NAME, platform_subsystem.into_subsystem()) + .detached(), + ); // • Start the MonitorController subsytem. - subsys.start(SubsystemBuilder::new( - MONITOR_CONTROLLER_SUBSYSTEM_NAME, - monitor_controller.into_subsystem(), - )); + subsys.start( + SubsystemBuilder::new( + MONITOR_CONTROLLER_SUBSYSTEM_NAME, + monitor_controller.into_subsystem(), + ) + .detached(), + ); // • Start the relay subsystem. - subsys.start(SubsystemBuilder::new( - RELAY_SUBSYSTEM_NAME, - relay_subsystem.into_subsystem(), - )); + subsys.start( + SubsystemBuilder::new(RELAY_SUBSYSTEM_NAME, relay_subsystem.into_subsystem()) + .detached(), + ); + subsys.on_shutdown_requested().await; + subsys.request_local_shutdown(); subsys.wait_for_children().await; Ok(()) } diff --git a/src/subsystems/controller/monitor.rs b/src/subsystems/controller/monitor.rs index af06826..1ee7880 100644 --- a/src/subsystems/controller/monitor.rs +++ b/src/subsystems/controller/monitor.rs @@ -136,10 +136,10 @@ impl IntoSubsystem for MonitorController { let mut handle = monitor_subsystem.handle(); // • Launch the subsystem. - subsys.start(SubsystemBuilder::new( - MONITOR_SUBSYSTEM_NAME, - monitor_subsystem.into_subsystem(), - )); + subsys.start( + SubsystemBuilder::new(MONITOR_SUBSYSTEM_NAME, monitor_subsystem.into_subsystem()) + .detached(), + ); // Now, we can periodically poll the monitor for // new data. @@ -163,6 +163,7 @@ impl IntoSubsystem for MonitorController { // the monitor is shut down. // NB: We can't implement the shutdown trait because // self has been partially moved. + subsys.request_local_shutdown(); subsys.wait_for_children().await; return Ok(()); } diff --git a/src/subsystems/ingress/mod.rs b/src/subsystems/ingress/mod.rs index 3ab2207..69f9a47 100644 --- a/src/subsystems/ingress/mod.rs +++ b/src/subsystems/ingress/mod.rs @@ -88,6 +88,8 @@ impl IntoSubsystem for IngressSubsystem { loop { select! { _ = subsys.on_shutdown_requested() => { + subsys.request_local_shutdown(); + subsys.wait_for_children().await; return self.shutdown().await; } // Shutdown signal from one of the handles. Since this thread has exclusive diff --git a/src/subsystems/monitor/mod.rs b/src/subsystems/monitor/mod.rs index 3c5ae43..f40a0a6 100644 --- a/src/subsystems/monitor/mod.rs +++ b/src/subsystems/monitor/mod.rs @@ -81,6 +81,8 @@ impl IntoSubsystem for MonitorSubsystem { loop { select! { _ = subsys.on_shutdown_requested() => { + subsys.request_local_shutdown(); + subsys.wait_for_children().await; return self.shutdown().await; } _ = self.shutdown.recv() => { diff --git a/src/subsystems/platform/mod.rs b/src/subsystems/platform/mod.rs index ef34793..8dad214 100644 --- a/src/subsystems/platform/mod.rs +++ b/src/subsystems/platform/mod.rs @@ -97,6 +97,8 @@ impl IntoSubsystem for PlatformSubsystem { select! { // Shutdown comes first so it has high priority. _ = subsys.on_shutdown_requested() => { + subsys.request_local_shutdown(); + subsys.wait_for_children().await; return self.shutdown().await; } // Shutdown signal from one of the handles. Since this thread has exclusive diff --git a/src/subsystems/relay/lock_mgmt.rs b/src/subsystems/relay/lock_mgmt.rs index 351d3f1..9b3d321 100644 --- a/src/subsystems/relay/lock_mgmt.rs +++ b/src/subsystems/relay/lock_mgmt.rs @@ -79,6 +79,8 @@ impl IntoSubsystem for LockManager { } } _ = subsys.on_shutdown_requested() => { + subsys.request_local_shutdown(); + subsys.wait_for_children().await; // Release the lock. return self.shutdown().await; } diff --git a/src/subsystems/relay/mod.rs b/src/subsystems/relay/mod.rs index 556d5b0..db6cead 100644 --- a/src/subsystems/relay/mod.rs +++ b/src/subsystems/relay/mod.rs @@ -92,16 +92,14 @@ impl IntoSubsystem for RelaySubsystem { // Kick off a task to poll the backend for new states. let mut poller = self.new_poller(); let mut state_stream = poller.take_stream()?; - subsys.start(SubsystemBuilder::new( - "StatePoller", - poller.into_subsystem(), - )); + subsys.start(SubsystemBuilder::new("StatePoller", poller.into_subsystem()).detached()); let mut observations = self.observations; loop { select! { // Besides that, we can just hang out. _ = subsys.on_shutdown_requested() => { + subsys.request_local_shutdown(); subsys.wait_for_children().await; return Ok(()); } @@ -133,7 +131,7 @@ impl IntoSubsystem for RelaySubsystem { subsys.start(SubsystemBuilder::new( format!("LockManager {}", state_id), lock_manager.into_subsystem(), - )); + ).detached()); // Now that we have the lock managed, we // need to tell the Platform/Ingress // to effect the state. diff --git a/src/subsystems/relay/poll_state.rs b/src/subsystems/relay/poll_state.rs index e0454c3..1be8d8b 100644 --- a/src/subsystems/relay/poll_state.rs +++ b/src/subsystems/relay/poll_state.rs @@ -70,6 +70,8 @@ impl IntoSubsystem for StatePoller { loop { select! { _ = subsys.on_shutdown_requested() => { + subsys.request_local_shutdown(); + subsys.wait_for_children().await; return self.shutdown().await } _ = self.timer.tick() => { From a1e055625c2629d09078a3ba31a749dfa96d4f1b Mon Sep 17 00:00:00 2001 From: Robbie McKinstry Date: Thu, 12 Jun 2025 10:47:37 -0400 Subject: [PATCH 2/3] Adjust the order of subsystem shutdown. This commit changes subsystem shutdown in the controller to be more careful about order and parentage to ensure we're shutting down our subsystems without writing to closed channels. --- src/cmd/run.rs | 2 +- src/subsystems/controller/mod.rs | 38 ++++++++++++++++++++-------- src/subsystems/controller/monitor.rs | 9 +++---- src/subsystems/relay/mod.rs | 13 +++++++--- 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/src/cmd/run.rs b/src/cmd/run.rs index 4938c2c..eb55b07 100644 --- a/src/cmd/run.rs +++ b/src/cmd/run.rs @@ -178,7 +178,7 @@ impl Run { s.start(SubsystemBuilder::new( CONTROLLER_SUBSYSTEM_NAME, controller.into_subsystem(), - ).detached()); + )); }) .catch_signals() .handle_shutdown_requests(Duration::from_millis(DEFAULT_SHUTDOWN_TIMEOUT)) diff --git a/src/subsystems/controller/mod.rs b/src/subsystems/controller/mod.rs index 261cee8..1d5860b 100644 --- a/src/subsystems/controller/mod.rs +++ b/src/subsystems/controller/mod.rs @@ -79,35 +79,51 @@ impl IntoSubsystem for ControllerSubsystem { .build(); // • Start the ingress subsystem. - subsys.start( + let ingress_subsys = subsys.start( SubsystemBuilder::new(INGRESS_SUBSYSTEM_NAME, ingress_subsystem.into_subsystem()) .detached(), ); // • Start the platform subsystem. - subsys.start( + let platform_subsys = subsys.start( SubsystemBuilder::new(PLATFORM_SUBSYSTEM_NAME, platform_subsystem.into_subsystem()) .detached(), ); // • Start the MonitorController subsytem. - subsys.start( - SubsystemBuilder::new( - MONITOR_CONTROLLER_SUBSYSTEM_NAME, - monitor_controller.into_subsystem(), - ) - .detached(), - ); + // The MonitorController and Monitor don't need to be + // detached because they can be shutdown in tandem. + // We need them to drop their channels to signal to + // the other subsystems why the shutdown has occurred. + subsys.start(SubsystemBuilder::new( + MONITOR_CONTROLLER_SUBSYSTEM_NAME, + monitor_controller.into_subsystem(), + )); // • Start the relay subsystem. - subsys.start( + let relay_subsys = subsys.start( SubsystemBuilder::new(RELAY_SUBSYSTEM_NAME, relay_subsystem.into_subsystem()) .detached(), ); subsys.on_shutdown_requested().await; - subsys.request_local_shutdown(); + // Waiting for children will block until the Monitor and + // MonitorController are shut down. subsys.wait_for_children().await; + // Next, we wait for the relay, because we need to abandon + // any state locks that we've taken before we can roll back + // the ingress or yank the platform. + relay_subsys.initiate_shutdown(); + relay_subsys.join().await?; + + ingress_subsys.initiate_shutdown(); + ingress_subsys.join().await?; + + platform_subsys.initiate_shutdown(); + platform_subsys.join().await?; + + // TODO: Tell the backend to mark the rollout as + // cancelled (if it isn't already marked as completed). Ok(()) } } diff --git a/src/subsystems/controller/monitor.rs b/src/subsystems/controller/monitor.rs index 1ee7880..af06826 100644 --- a/src/subsystems/controller/monitor.rs +++ b/src/subsystems/controller/monitor.rs @@ -136,10 +136,10 @@ impl IntoSubsystem for MonitorController { let mut handle = monitor_subsystem.handle(); // • Launch the subsystem. - subsys.start( - SubsystemBuilder::new(MONITOR_SUBSYSTEM_NAME, monitor_subsystem.into_subsystem()) - .detached(), - ); + subsys.start(SubsystemBuilder::new( + MONITOR_SUBSYSTEM_NAME, + monitor_subsystem.into_subsystem(), + )); // Now, we can periodically poll the monitor for // new data. @@ -163,7 +163,6 @@ impl IntoSubsystem for MonitorController { // the monitor is shut down. // NB: We can't implement the shutdown trait because // self has been partially moved. - subsys.request_local_shutdown(); subsys.wait_for_children().await; return Ok(()); } diff --git a/src/subsystems/relay/mod.rs b/src/subsystems/relay/mod.rs index db6cead..7178658 100644 --- a/src/subsystems/relay/mod.rs +++ b/src/subsystems/relay/mod.rs @@ -92,7 +92,10 @@ impl IntoSubsystem for RelaySubsystem { // Kick off a task to poll the backend for new states. let mut poller = self.new_poller(); let mut state_stream = poller.take_stream()?; - subsys.start(SubsystemBuilder::new("StatePoller", poller.into_subsystem()).detached()); + subsys.start(SubsystemBuilder::new( + "StatePoller", + poller.into_subsystem(), + )); let mut observations = self.observations; loop { @@ -100,6 +103,8 @@ impl IntoSubsystem for RelaySubsystem { // Besides that, we can just hang out. _ = subsys.on_shutdown_requested() => { subsys.request_local_shutdown(); + // Waiting for children ensure that all of the locks + // we've taken have been released. subsys.wait_for_children().await; return Ok(()); } @@ -112,7 +117,7 @@ impl IntoSubsystem for RelaySubsystem { } else { // The stream has been closed, so we should shutdown. debug!("Shutting down in relay"); - subsys.request_shutdown(); + subsys.request_local_shutdown(); } } // • We also need to poll the backend for new states. @@ -131,7 +136,7 @@ impl IntoSubsystem for RelaySubsystem { subsys.start(SubsystemBuilder::new( format!("LockManager {}", state_id), lock_manager.into_subsystem(), - ).detached()); + )); // Now that we have the lock managed, we // need to tell the Platform/Ingress // to effect the state. @@ -186,7 +191,7 @@ impl IntoSubsystem for RelaySubsystem { } } else { // The stream has been closed, so we should shutdown. - subsys.request_shutdown(); + subsys.request_local_shutdown(); } } } From 7198c77b14bee74672842d7fe0576578282e42a5 Mon Sep 17 00:00:00 2001 From: Eric Ghildyal Date: Fri, 13 Jun 2025 12:34:21 -0400 Subject: [PATCH 3/3] Add so many traces to try and find the deadlock --- src/adapters/ingresses/cloudflare.rs | 3 ++- src/subsystems/controller/mod.rs | 26 ++++++++++++++++++++++++++ src/subsystems/controller/monitor.rs | 8 +++++++- src/subsystems/ingress/mod.rs | 7 ++++++- src/subsystems/monitor/mod.rs | 12 ++++++++++-- src/subsystems/platform/mod.rs | 7 ++++++- src/subsystems/relay/lock_mgmt.rs | 13 ++++++++++++- src/subsystems/relay/mod.rs | 8 ++++++-- src/subsystems/relay/poll_state.rs | 7 +++++++ 9 files changed, 82 insertions(+), 9 deletions(-) diff --git a/src/adapters/ingresses/cloudflare.rs b/src/adapters/ingresses/cloudflare.rs index 30550bb..68389b4 100644 --- a/src/adapters/ingresses/cloudflare.rs +++ b/src/adapters/ingresses/cloudflare.rs @@ -11,7 +11,7 @@ use crate::{ use super::Ingress; use async_trait::async_trait; use derive_getters::Getters; -use miette::Result; +use miette::{Result, bail}; use tracing::{debug, info}; #[derive(Getters)] @@ -72,6 +72,7 @@ impl Ingress for CloudflareWorkerIngress { async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> { info!("Setting Cloudflare canary traffic to {percent}."); + // bail!("BAIL"); let control_version = DeploymentVersion::builder() .percentage((100 - percent.clone().as_i32()) as u64) .version_id(self.control_version_id.clone().unwrap()) diff --git a/src/subsystems/controller/mod.rs b/src/subsystems/controller/mod.rs index 1d5860b..897eff8 100644 --- a/src/subsystems/controller/mod.rs +++ b/src/subsystems/controller/mod.rs @@ -106,24 +106,50 @@ impl IntoSubsystem for ControllerSubsystem { .detached(), ); + trace!("Controller waiting for shutdown request..."); subsys.on_shutdown_requested().await; + subsys.initiate_shutdown(); + trace!("Controller shutdown requested!"); // Waiting for children will block until the Monitor and // MonitorController are shut down. + trace!("Contoller waiting for children to shutdown"); subsys.wait_for_children().await; + trace!("Controller children shutdown"); // Next, we wait for the relay, because we need to abandon // any state locks that we've taken before we can roll back // the ingress or yank the platform. + trace!("Controller waiting for relay to shutdown"); relay_subsys.initiate_shutdown(); + trace!("Controller waiting for relay to shutdown complete"); + trace!("Controller waiting for relay to join"); relay_subsys.join().await?; + trace!("Relay joined"); + trace!("Relay shutdown!"); + + trace!("Controller waiting for ingress to shutdown"); ingress_subsys.initiate_shutdown(); + trace!("Controller waiting for ingress to shutdown complete"); + trace!("Controller waiting for ingress to join"); ingress_subsys.join().await?; + trace!("Ingress joined"); + + trace!("Ingress shutdown!"); + trace!("Controller waiting for platform to shutdown"); platform_subsys.initiate_shutdown(); + trace!("Controller waiting for platform to shutdown complete"); + trace!("Controller waiting for platform to join"); platform_subsys.join().await?; + trace!("Platform joined"); + + trace!("Platform shutdown!"); // TODO: Tell the backend to mark the rollout as // cancelled (if it isn't already marked as completed). + trace!("TODO: API CALL TO CANCEL ROLLOUT"); + + trace!("Controller shutdown complete!"); Ok(()) } } diff --git a/src/subsystems/controller/monitor.rs b/src/subsystems/controller/monitor.rs index af06826..4d21247 100644 --- a/src/subsystems/controller/monitor.rs +++ b/src/subsystems/controller/monitor.rs @@ -11,7 +11,7 @@ use tokio::{ }; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemBuilder, SubsystemHandle}; use tokio_stream::{Stream, StreamExt as _, wrappers::IntervalStream}; -use tracing::debug; +use tracing::{debug, trace}; use crate::{ MonitorSubsystem, @@ -157,13 +157,19 @@ impl IntoSubsystem for MonitorController { loop { select! { _ = subsys.on_shutdown_requested() => { + trace!("MonitorController received shutdown request."); // If we've received the shutdown signal, // we don't have anything to do except ensure // our children have shutdown, guaranteeing // the monitor is shut down. // NB: We can't implement the shutdown trait because // self has been partially moved. + subsys.request_local_shutdown(); + trace!("MonitorController requested local shutdown complete."); + + trace!("MonitorController waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("MonitorController children shutdown complete."); return Ok(()); } baseline_version_id = self.baseline_receiver.recv() => { diff --git a/src/subsystems/ingress/mod.rs b/src/subsystems/ingress/mod.rs index 69f9a47..7aa26cd 100644 --- a/src/subsystems/ingress/mod.rs +++ b/src/subsystems/ingress/mod.rs @@ -6,7 +6,7 @@ use miette::{Report, Result}; use tokio::sync::mpsc::channel; use tokio::{select, sync::mpsc::Receiver}; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; -use tracing::debug; +use tracing::{debug, trace}; use crate::adapters::BoxedIngress; @@ -88,8 +88,13 @@ impl IntoSubsystem for IngressSubsystem { loop { select! { _ = subsys.on_shutdown_requested() => { + trace!("IngressSubsystem received shutdown request."); subsys.request_local_shutdown(); + trace!("IngressSubsystem requested local shutdown complete."); + + trace!("IngressSubsystem waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("IngressSubsystem children shutdown complete."); return self.shutdown().await; } // Shutdown signal from one of the handles. Since this thread has exclusive diff --git a/src/subsystems/monitor/mod.rs b/src/subsystems/monitor/mod.rs index f40a0a6..9545254 100644 --- a/src/subsystems/monitor/mod.rs +++ b/src/subsystems/monitor/mod.rs @@ -11,7 +11,7 @@ use tokio::{ sync::mpsc::{Receiver, channel}, }; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; -use tracing::debug; +use tracing::{debug, trace}; use super::handle::Handle; use super::{ShutdownResult, Shutdownable}; @@ -81,8 +81,13 @@ impl IntoSubsystem for MonitorSubsystem { loop { select! { _ = subsys.on_shutdown_requested() => { + trace!("MonitorSubsystem received shutdown request."); subsys.request_local_shutdown(); + trace!("MonitorSubsystem requested local shutdown complete."); + + trace!("MonitorSubsystem waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("MonitorSubsystem children shutdown complete."); return self.shutdown().await; } _ = self.shutdown.recv() => { @@ -104,9 +109,12 @@ impl IntoSubsystem for MonitorSubsystem { #[async_trait] impl Shutdownable for MonitorSubsystem { async fn shutdown(&mut self) -> ShutdownResult { + trace!("Shutting down MonitorSubsystem..."); // We just have to shut the monitor down manually, // since we have an exclusive lock on it. - self.monitor.shutdown().await + let _ = self.monitor.shutdown().await; + trace!("MonitorSubsystem shut down!"); + Ok(()) } } diff --git a/src/subsystems/platform/mod.rs b/src/subsystems/platform/mod.rs index 8dad214..4e3d0ee 100644 --- a/src/subsystems/platform/mod.rs +++ b/src/subsystems/platform/mod.rs @@ -8,7 +8,7 @@ use tokio::{ sync::mpsc::{self, Receiver, channel}, }; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; -use tracing::debug; +use tracing::{debug, trace}; use crate::adapters::BoxedPlatform; @@ -97,8 +97,13 @@ impl IntoSubsystem for PlatformSubsystem { select! { // Shutdown comes first so it has high priority. _ = subsys.on_shutdown_requested() => { + trace!("PlatformSubsystem received shutdown request."); subsys.request_local_shutdown(); + trace!("PlatformSubsystem requested local shutdown complete."); + + trace!("PlatformSubsystem waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("PlatformSubsystem children shutdown complete."); return self.shutdown().await; } // Shutdown signal from one of the handles. Since this thread has exclusive diff --git a/src/subsystems/relay/lock_mgmt.rs b/src/subsystems/relay/lock_mgmt.rs index 9b3d321..614c80b 100644 --- a/src/subsystems/relay/lock_mgmt.rs +++ b/src/subsystems/relay/lock_mgmt.rs @@ -8,6 +8,7 @@ use tokio::sync::mpsc::{self, Receiver}; use tokio::sync::oneshot; use tokio::time::{Interval, interval}; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; +use tracing::{debug, trace}; use crate::{ Shutdownable, @@ -44,7 +45,9 @@ impl LockManager { ) -> Result { let (done_sender, task_done) = mpsc::channel(1); // Take the initial lock. + trace!("LockManager taking initial lock..."); let locked_state = backend.lock_state(&metadata, &state, done_sender).await?; + trace!("LockManager taking initial lock complete."); let freq = *locked_state.frequency(); let timer = interval(freq / 2); Ok(Self { @@ -79,8 +82,13 @@ impl IntoSubsystem for LockManager { } } _ = subsys.on_shutdown_requested() => { + trace!("LockManager received shutdown request."); subsys.request_local_shutdown(); + trace!("LockManager requested local shutdown complete."); + + trace!("LockManager waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("LockManager children shutdown complete."); // Release the lock. return self.shutdown().await; } @@ -96,7 +104,10 @@ impl IntoSubsystem for LockManager { #[async_trait] impl Shutdownable for LockManager { async fn shutdown(&mut self) -> ShutdownResult { + trace!("LockManager shutting down..."); // Release any of the locks we've taken. - self.backend.abandon_lock(&self.meta, &self.state).await + let _ = self.backend.abandon_lock(&self.meta, &self.state).await; + trace!("LockManager shut down!"); + Ok(()) } } diff --git a/src/subsystems/relay/mod.rs b/src/subsystems/relay/mod.rs index 7178658..88228ee 100644 --- a/src/subsystems/relay/mod.rs +++ b/src/subsystems/relay/mod.rs @@ -102,10 +102,14 @@ impl IntoSubsystem for RelaySubsystem { select! { // Besides that, we can just hang out. _ = subsys.on_shutdown_requested() => { + trace!("RelaySubsystem received shutdown request."); subsys.request_local_shutdown(); + trace!("RelaySubsystem requested local shutdown complete."); // Waiting for children ensure that all of the locks // we've taken have been released. + trace!("RelaySubsystem waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("RelaySubstsem children shutdown complete"); return Ok(()); } // • When we start the RelaySubsystem, @@ -116,7 +120,6 @@ impl IntoSubsystem for RelaySubsystem { self.backend.upload_observations(&self.meta, batch).await?; } else { // The stream has been closed, so we should shutdown. - debug!("Shutting down in relay"); subsys.request_local_shutdown(); } } @@ -175,7 +178,7 @@ impl IntoSubsystem for RelaySubsystem { let percent = WholePercent::try_from(percent_traffic).unwrap(); self.ingress.set_canary_traffic(percent).await?; - locked_state.mark_done().await?; + // locked_state.mark_done().await?; }, RollbackCanary => { // Set traffic to 0 immediately. @@ -191,6 +194,7 @@ impl IntoSubsystem for RelaySubsystem { } } else { // The stream has been closed, so we should shutdown. + trace!("Shutting down in relay from closed state stream"); subsys.request_local_shutdown(); } } diff --git a/src/subsystems/relay/poll_state.rs b/src/subsystems/relay/poll_state.rs index 1be8d8b..6e071e6 100644 --- a/src/subsystems/relay/poll_state.rs +++ b/src/subsystems/relay/poll_state.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use bon::bon; use miette::{IntoDiagnostic, Report, Result}; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; +use tracing::trace; use crate::{ Shutdownable, @@ -70,8 +71,13 @@ impl IntoSubsystem for StatePoller { loop { select! { _ = subsys.on_shutdown_requested() => { + trace!("StatePoller received shutdown request"); subsys.request_local_shutdown(); + trace!("StatePoller requested local shutdown complete."); + + trace!("StatePoller waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("StatePoller children shutdown complete."); return self.shutdown().await } _ = self.timer.tick() => { @@ -90,6 +96,7 @@ impl IntoSubsystem for StatePoller { #[async_trait] impl Shutdownable for StatePoller { async fn shutdown(&mut self) -> ShutdownResult { + trace!("StatePoller shut down!"); // Nothing to do! We just stop polling. Ok(()) }