diff --git a/src/adapters/ingresses/cloudflare.rs b/src/adapters/ingresses/cloudflare.rs index 30550bb..68389b4 100644 --- a/src/adapters/ingresses/cloudflare.rs +++ b/src/adapters/ingresses/cloudflare.rs @@ -11,7 +11,7 @@ use crate::{ use super::Ingress; use async_trait::async_trait; use derive_getters::Getters; -use miette::Result; +use miette::{Result, bail}; use tracing::{debug, info}; #[derive(Getters)] @@ -72,6 +72,7 @@ impl Ingress for CloudflareWorkerIngress { async fn set_canary_traffic(&mut self, percent: WholePercent) -> Result<()> { info!("Setting Cloudflare canary traffic to {percent}."); + // bail!("BAIL"); let control_version = DeploymentVersion::builder() .percentage((100 - percent.clone().as_i32()) as u64) .version_id(self.control_version_id.clone().unwrap()) diff --git a/src/subsystems/controller/mod.rs b/src/subsystems/controller/mod.rs index 48d1ea4..897eff8 100644 --- a/src/subsystems/controller/mod.rs +++ b/src/subsystems/controller/mod.rs @@ -79,30 +79,77 @@ impl IntoSubsystem for ControllerSubsystem { .build(); // • Start the ingress subsystem. - subsys.start(SubsystemBuilder::new( - INGRESS_SUBSYSTEM_NAME, - ingress_subsystem.into_subsystem(), - )); + let ingress_subsys = subsys.start( + SubsystemBuilder::new(INGRESS_SUBSYSTEM_NAME, ingress_subsystem.into_subsystem()) + .detached(), + ); // • Start the platform subsystem. - subsys.start(SubsystemBuilder::new( - PLATFORM_SUBSYSTEM_NAME, - platform_subsystem.into_subsystem(), - )); + let platform_subsys = subsys.start( + SubsystemBuilder::new(PLATFORM_SUBSYSTEM_NAME, platform_subsystem.into_subsystem()) + .detached(), + ); // • Start the MonitorController subsytem. + // The MonitorController and Monitor don't need to be + // detached because they can be shutdown in tandem. + // We need them to drop their channels to signal to + // the other subsystems why the shutdown has occurred. subsys.start(SubsystemBuilder::new( MONITOR_CONTROLLER_SUBSYSTEM_NAME, monitor_controller.into_subsystem(), )); // • Start the relay subsystem. - subsys.start(SubsystemBuilder::new( - RELAY_SUBSYSTEM_NAME, - relay_subsystem.into_subsystem(), - )); - + let relay_subsys = subsys.start( + SubsystemBuilder::new(RELAY_SUBSYSTEM_NAME, relay_subsystem.into_subsystem()) + .detached(), + ); + + trace!("Controller waiting for shutdown request..."); + subsys.on_shutdown_requested().await; + subsys.initiate_shutdown(); + trace!("Controller shutdown requested!"); + // Waiting for children will block until the Monitor and + // MonitorController are shut down. + trace!("Contoller waiting for children to shutdown"); subsys.wait_for_children().await; + trace!("Controller children shutdown"); + // Next, we wait for the relay, because we need to abandon + // any state locks that we've taken before we can roll back + // the ingress or yank the platform. + trace!("Controller waiting for relay to shutdown"); + relay_subsys.initiate_shutdown(); + trace!("Controller waiting for relay to shutdown complete"); + trace!("Controller waiting for relay to join"); + relay_subsys.join().await?; + trace!("Relay joined"); + + trace!("Relay shutdown!"); + + trace!("Controller waiting for ingress to shutdown"); + ingress_subsys.initiate_shutdown(); + trace!("Controller waiting for ingress to shutdown complete"); + trace!("Controller waiting for ingress to join"); + ingress_subsys.join().await?; + trace!("Ingress joined"); + + trace!("Ingress shutdown!"); + + trace!("Controller waiting for platform to shutdown"); + platform_subsys.initiate_shutdown(); + trace!("Controller waiting for platform to shutdown complete"); + trace!("Controller waiting for platform to join"); + platform_subsys.join().await?; + trace!("Platform joined"); + + trace!("Platform shutdown!"); + + // TODO: Tell the backend to mark the rollout as + // cancelled (if it isn't already marked as completed). + trace!("TODO: API CALL TO CANCEL ROLLOUT"); + + trace!("Controller shutdown complete!"); Ok(()) } } diff --git a/src/subsystems/controller/monitor.rs b/src/subsystems/controller/monitor.rs index af06826..4d21247 100644 --- a/src/subsystems/controller/monitor.rs +++ b/src/subsystems/controller/monitor.rs @@ -11,7 +11,7 @@ use tokio::{ }; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemBuilder, SubsystemHandle}; use tokio_stream::{Stream, StreamExt as _, wrappers::IntervalStream}; -use tracing::debug; +use tracing::{debug, trace}; use crate::{ MonitorSubsystem, @@ -157,13 +157,19 @@ impl IntoSubsystem for MonitorController { loop { select! { _ = subsys.on_shutdown_requested() => { + trace!("MonitorController received shutdown request."); // If we've received the shutdown signal, // we don't have anything to do except ensure // our children have shutdown, guaranteeing // the monitor is shut down. // NB: We can't implement the shutdown trait because // self has been partially moved. + subsys.request_local_shutdown(); + trace!("MonitorController requested local shutdown complete."); + + trace!("MonitorController waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("MonitorController children shutdown complete."); return Ok(()); } baseline_version_id = self.baseline_receiver.recv() => { diff --git a/src/subsystems/ingress/mod.rs b/src/subsystems/ingress/mod.rs index 3ab2207..7aa26cd 100644 --- a/src/subsystems/ingress/mod.rs +++ b/src/subsystems/ingress/mod.rs @@ -6,7 +6,7 @@ use miette::{Report, Result}; use tokio::sync::mpsc::channel; use tokio::{select, sync::mpsc::Receiver}; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; -use tracing::debug; +use tracing::{debug, trace}; use crate::adapters::BoxedIngress; @@ -88,6 +88,13 @@ impl IntoSubsystem for IngressSubsystem { loop { select! { _ = subsys.on_shutdown_requested() => { + trace!("IngressSubsystem received shutdown request."); + subsys.request_local_shutdown(); + trace!("IngressSubsystem requested local shutdown complete."); + + trace!("IngressSubsystem waiting for children to shutdown."); + subsys.wait_for_children().await; + trace!("IngressSubsystem children shutdown complete."); return self.shutdown().await; } // Shutdown signal from one of the handles. Since this thread has exclusive diff --git a/src/subsystems/monitor/mod.rs b/src/subsystems/monitor/mod.rs index 3c5ae43..9545254 100644 --- a/src/subsystems/monitor/mod.rs +++ b/src/subsystems/monitor/mod.rs @@ -11,7 +11,7 @@ use tokio::{ sync::mpsc::{Receiver, channel}, }; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; -use tracing::debug; +use tracing::{debug, trace}; use super::handle::Handle; use super::{ShutdownResult, Shutdownable}; @@ -81,6 +81,13 @@ impl IntoSubsystem for MonitorSubsystem { loop { select! { _ = subsys.on_shutdown_requested() => { + trace!("MonitorSubsystem received shutdown request."); + subsys.request_local_shutdown(); + trace!("MonitorSubsystem requested local shutdown complete."); + + trace!("MonitorSubsystem waiting for children to shutdown."); + subsys.wait_for_children().await; + trace!("MonitorSubsystem children shutdown complete."); return self.shutdown().await; } _ = self.shutdown.recv() => { @@ -102,9 +109,12 @@ impl IntoSubsystem for MonitorSubsystem { #[async_trait] impl Shutdownable for MonitorSubsystem { async fn shutdown(&mut self) -> ShutdownResult { + trace!("Shutting down MonitorSubsystem..."); // We just have to shut the monitor down manually, // since we have an exclusive lock on it. - self.monitor.shutdown().await + let _ = self.monitor.shutdown().await; + trace!("MonitorSubsystem shut down!"); + Ok(()) } } diff --git a/src/subsystems/platform/mod.rs b/src/subsystems/platform/mod.rs index ef34793..4e3d0ee 100644 --- a/src/subsystems/platform/mod.rs +++ b/src/subsystems/platform/mod.rs @@ -8,7 +8,7 @@ use tokio::{ sync::mpsc::{self, Receiver, channel}, }; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; -use tracing::debug; +use tracing::{debug, trace}; use crate::adapters::BoxedPlatform; @@ -97,6 +97,13 @@ impl IntoSubsystem for PlatformSubsystem { select! { // Shutdown comes first so it has high priority. _ = subsys.on_shutdown_requested() => { + trace!("PlatformSubsystem received shutdown request."); + subsys.request_local_shutdown(); + trace!("PlatformSubsystem requested local shutdown complete."); + + trace!("PlatformSubsystem waiting for children to shutdown."); + subsys.wait_for_children().await; + trace!("PlatformSubsystem children shutdown complete."); return self.shutdown().await; } // Shutdown signal from one of the handles. Since this thread has exclusive diff --git a/src/subsystems/relay/lock_mgmt.rs b/src/subsystems/relay/lock_mgmt.rs index 351d3f1..614c80b 100644 --- a/src/subsystems/relay/lock_mgmt.rs +++ b/src/subsystems/relay/lock_mgmt.rs @@ -8,6 +8,7 @@ use tokio::sync::mpsc::{self, Receiver}; use tokio::sync::oneshot; use tokio::time::{Interval, interval}; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; +use tracing::{debug, trace}; use crate::{ Shutdownable, @@ -44,7 +45,9 @@ impl LockManager { ) -> Result { let (done_sender, task_done) = mpsc::channel(1); // Take the initial lock. + trace!("LockManager taking initial lock..."); let locked_state = backend.lock_state(&metadata, &state, done_sender).await?; + trace!("LockManager taking initial lock complete."); let freq = *locked_state.frequency(); let timer = interval(freq / 2); Ok(Self { @@ -79,6 +82,13 @@ impl IntoSubsystem for LockManager { } } _ = subsys.on_shutdown_requested() => { + trace!("LockManager received shutdown request."); + subsys.request_local_shutdown(); + trace!("LockManager requested local shutdown complete."); + + trace!("LockManager waiting for children to shutdown."); + subsys.wait_for_children().await; + trace!("LockManager children shutdown complete."); // Release the lock. return self.shutdown().await; } @@ -94,7 +104,10 @@ impl IntoSubsystem for LockManager { #[async_trait] impl Shutdownable for LockManager { async fn shutdown(&mut self) -> ShutdownResult { + trace!("LockManager shutting down..."); // Release any of the locks we've taken. - self.backend.abandon_lock(&self.meta, &self.state).await + let _ = self.backend.abandon_lock(&self.meta, &self.state).await; + trace!("LockManager shut down!"); + Ok(()) } } diff --git a/src/subsystems/relay/mod.rs b/src/subsystems/relay/mod.rs index 556d5b0..88228ee 100644 --- a/src/subsystems/relay/mod.rs +++ b/src/subsystems/relay/mod.rs @@ -102,7 +102,14 @@ impl IntoSubsystem for RelaySubsystem { select! { // Besides that, we can just hang out. _ = subsys.on_shutdown_requested() => { + trace!("RelaySubsystem received shutdown request."); + subsys.request_local_shutdown(); + trace!("RelaySubsystem requested local shutdown complete."); + // Waiting for children ensure that all of the locks + // we've taken have been released. + trace!("RelaySubsystem waiting for children to shutdown."); subsys.wait_for_children().await; + trace!("RelaySubstsem children shutdown complete"); return Ok(()); } // • When we start the RelaySubsystem, @@ -113,8 +120,7 @@ impl IntoSubsystem for RelaySubsystem { self.backend.upload_observations(&self.meta, batch).await?; } else { // The stream has been closed, so we should shutdown. - debug!("Shutting down in relay"); - subsys.request_shutdown(); + subsys.request_local_shutdown(); } } // • We also need to poll the backend for new states. @@ -172,7 +178,7 @@ impl IntoSubsystem for RelaySubsystem { let percent = WholePercent::try_from(percent_traffic).unwrap(); self.ingress.set_canary_traffic(percent).await?; - locked_state.mark_done().await?; + // locked_state.mark_done().await?; }, RollbackCanary => { // Set traffic to 0 immediately. @@ -188,7 +194,8 @@ impl IntoSubsystem for RelaySubsystem { } } else { // The stream has been closed, so we should shutdown. - subsys.request_shutdown(); + trace!("Shutting down in relay from closed state stream"); + subsys.request_local_shutdown(); } } } diff --git a/src/subsystems/relay/poll_state.rs b/src/subsystems/relay/poll_state.rs index e0454c3..6e071e6 100644 --- a/src/subsystems/relay/poll_state.rs +++ b/src/subsystems/relay/poll_state.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use bon::bon; use miette::{IntoDiagnostic, Report, Result}; use tokio_graceful_shutdown::{IntoSubsystem, SubsystemHandle}; +use tracing::trace; use crate::{ Shutdownable, @@ -70,6 +71,13 @@ impl IntoSubsystem for StatePoller { loop { select! { _ = subsys.on_shutdown_requested() => { + trace!("StatePoller received shutdown request"); + subsys.request_local_shutdown(); + trace!("StatePoller requested local shutdown complete."); + + trace!("StatePoller waiting for children to shutdown."); + subsys.wait_for_children().await; + trace!("StatePoller children shutdown complete."); return self.shutdown().await } _ = self.timer.tick() => { @@ -88,6 +96,7 @@ impl IntoSubsystem for StatePoller { #[async_trait] impl Shutdownable for StatePoller { async fn shutdown(&mut self) -> ShutdownResult { + trace!("StatePoller shut down!"); // Nothing to do! We just stop polling. Ok(()) }