diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 07bf67a1225fb..79d56ef3c584a 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -94,8 +94,8 @@ where pub until: Antichain, /// Bindings of identifiers to collections. pub bindings: BTreeMap>, - /// An optional token that operators can probe to know whether the dataflow is shutting down. - pub shutdown_token: Option>, + /// An token that operators can probe to know whether the dataflow is shutting down. + pub(super) shutdown_token: ShutdownToken, } impl Context @@ -121,7 +121,7 @@ where as_of_frontier, until: dataflow.until.clone(), bindings: BTreeMap::new(), - shutdown_token: None, + shutdown_token: Default::default(), } } } @@ -175,6 +175,42 @@ where } } +/// Convenient wrapper around an optional `Weak` instance that can be used to check whether a +/// datalow is shutting down. +/// +/// Instances created through the `Default` impl act as if the dataflow never shuts down. +/// Instances created through [`ShutdownToken::new`] defer to the wrapped token. +#[derive(Clone, Default)] +pub(super) struct ShutdownToken(Option>); + +impl ShutdownToken { + /// Construct a `ShutdownToken` instance that defers to `token`. + pub(super) fn new(token: Weak<()>) -> Self { + Self(Some(token)) + } + + /// Probe the token for dataflow shutdown. + /// + /// This method is meant to be used with the `?` operator: It returns `None` if the dataflow is + /// in the process of shutting down and `Some` otherwise. + pub(super) fn probe(&self) -> Option<()> { + match &self.0 { + Some(t) => t.upgrade().map(|_| ()), + None => Some(()), + } + } + + /// Returns whether the dataflow is in the process of shutting down. + pub(super) fn in_shutdown(&self) -> bool { + self.probe().is_none() + } + + /// Returns a reference to the wrapped `Weak`. + pub(crate) fn get_inner(&self) -> Option<&Weak<()>> { + self.0.as_ref() + } +} + /// Describes flavor of arrangement: local or imported trace. #[derive(Clone)] pub enum ArrangementFlavor diff --git a/src/compute/src/render/errors.rs b/src/compute/src/render/errors.rs index fcc57067f0329..26d36d19fb857 100644 --- a/src/compute/src/render/errors.rs +++ b/src/compute/src/render/errors.rs @@ -10,12 +10,14 @@ //! Helpers for handling errors encountered by operators. use std::hash::Hash; -use std::rc::Weak; use differential_dataflow::ExchangeData; -use mz_repr::Row; use timely::container::columnation::Columnation; +use mz_repr::Row; + +use super::context::ShutdownToken; + /// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`, /// specialized for use in compute. Validation code will only run when the error constructor is /// Some. @@ -77,25 +79,18 @@ where /// the process of shutting down. #[derive(Clone)] pub(super) struct ErrorLogger { - token: Option>, + token: ShutdownToken, dataflow_name: String, } impl ErrorLogger { - pub fn new(token: Option>, dataflow_name: String) -> Self { + pub fn new(token: ShutdownToken, dataflow_name: String) -> Self { Self { token, dataflow_name, } } - fn token_alive(&self) -> bool { - match &self.token { - Some(t) => t.upgrade().is_some(), - None => true, - } - } - /// Log the given error, unless the dataflow is shutting down. /// /// The logging format is optimized for surfacing errors with Sentry: @@ -113,7 +108,7 @@ impl ErrorLogger { /// // TODO(#18214): Rethink or justify our error logging strategy. pub fn log(&self, message: &'static str, details: &str) { - if self.token_alive() { + if !self.token.in_shutdown() { self.log_always(message, details); } } diff --git a/src/compute/src/render/join/delta_join.rs b/src/compute/src/render/join/delta_join.rs index 8fc13b3ad562a..59291233d4ce5 100644 --- a/src/compute/src/render/join/delta_join.rs +++ b/src/compute/src/render/join/delta_join.rs @@ -12,6 +12,7 @@ //! Consult [DeltaJoinPlan] documentation for details. #![allow(clippy::op_ref)] + use std::collections::{BTreeMap, BTreeSet}; use timely::dataflow::Scope; @@ -24,7 +25,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena}; use mz_storage_client::types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; -use crate::render::context::{ArrangementFlavor, CollectionBundle, Context}; +use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken}; impl Context where @@ -198,6 +199,7 @@ where stream_thinning, |t1, t2| t1.le(t2), closure, + self.shutdown_token.clone(), ) } else { build_halfjoin( @@ -207,6 +209,7 @@ where stream_thinning, |t1, t2| t1.lt(t2), closure, + self.shutdown_token.clone(), ) } } @@ -219,6 +222,7 @@ where stream_thinning, |t1, t2| t1.le(t2), closure, + self.shutdown_token.clone(), ) } else { build_halfjoin( @@ -228,6 +232,7 @@ where stream_thinning, |t1, t2| t1.lt(t2), closure, + self.shutdown_token.clone(), ) } } @@ -314,6 +319,7 @@ fn build_halfjoin( prev_thinning: Vec, comparison: CF, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Collection, @@ -364,6 +370,10 @@ where |_timer, count| count > 1_000_000, // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use. move |key, stream_row, lookup_row, initial, time, diff1, diff2| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]); let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder); @@ -396,6 +406,10 @@ where |_timer, count| count > 1_000_000, // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use. move |key, stream_row, lookup_row, initial, time, diff1, diff2| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]); let row = closure diff --git a/src/compute/src/render/mod.rs b/src/compute/src/render/mod.rs index 72fe9baa9c5bf..1d1294c2cf43a 100644 --- a/src/compute/src/render/mod.rs +++ b/src/compute/src/render/mod.rs @@ -137,6 +137,8 @@ use crate::typedefs::{ErrSpine, RowKeySpine}; pub use context::CollectionBundle; use context::{ArrangementFlavor, Context}; +use self::context::ShutdownToken; + pub mod context; mod errors; mod flat_map; @@ -299,7 +301,7 @@ pub fn build_compute_dataflow( // Build declared objects. for object in dataflow.objects_to_build { let object_token = Rc::new(()); - context.shutdown_token = Some(Rc::downgrade(&object_token)); + context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token)); tokens.insert(object.id, object_token); let bundle = context.render_recursive_plan(0, object.plan); @@ -353,7 +355,7 @@ pub fn build_compute_dataflow( // Build declared objects. for object in dataflow.objects_to_build { let object_token = Rc::new(()); - context.shutdown_token = Some(Rc::downgrade(&object_token)); + context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token)); tokens.insert(object.id, object_token); context.build_object(object); @@ -651,7 +653,7 @@ where // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point. let mut oks = oks.consolidate_named::>("LetRecConsolidation"); - if let Some(token) = &self.shutdown_token { + if let Some(token) = &self.shutdown_token.get_inner() { oks = oks.with_token(Weak::clone(token)); } oks_v.set(&oks); @@ -668,7 +670,7 @@ where move |_k, _s, t| t.push(((), 1)), ) .as_collection(|k, _| k.clone()); - if let Some(token) = &self.shutdown_token { + if let Some(token) = &self.shutdown_token.get_inner() { errs = errs.with_token(Weak::clone(token)); } err_v.set(&errs);