From ff5f3a84f9d4835512ea45083418dcfdb7018ebc Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 20 Apr 2023 16:47:22 +0200 Subject: [PATCH 1/2] compute: `ShutdownToken` wrapper --- src/compute/src/render/context.rs | 42 ++++++++++++++++++++++++++++--- src/compute/src/render/errors.rs | 19 ++++++-------- src/compute/src/render/mod.rs | 10 +++++--- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 07bf67a1225fb..79d56ef3c584a 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -94,8 +94,8 @@ where pub until: Antichain, /// Bindings of identifiers to collections. pub bindings: BTreeMap>, - /// An optional token that operators can probe to know whether the dataflow is shutting down. - pub shutdown_token: Option>, + /// An token that operators can probe to know whether the dataflow is shutting down. + pub(super) shutdown_token: ShutdownToken, } impl Context @@ -121,7 +121,7 @@ where as_of_frontier, until: dataflow.until.clone(), bindings: BTreeMap::new(), - shutdown_token: None, + shutdown_token: Default::default(), } } } @@ -175,6 +175,42 @@ where } } +/// Convenient wrapper around an optional `Weak` instance that can be used to check whether a +/// datalow is shutting down. +/// +/// Instances created through the `Default` impl act as if the dataflow never shuts down. +/// Instances created through [`ShutdownToken::new`] defer to the wrapped token. +#[derive(Clone, Default)] +pub(super) struct ShutdownToken(Option>); + +impl ShutdownToken { + /// Construct a `ShutdownToken` instance that defers to `token`. + pub(super) fn new(token: Weak<()>) -> Self { + Self(Some(token)) + } + + /// Probe the token for dataflow shutdown. + /// + /// This method is meant to be used with the `?` operator: It returns `None` if the dataflow is + /// in the process of shutting down and `Some` otherwise. + pub(super) fn probe(&self) -> Option<()> { + match &self.0 { + Some(t) => t.upgrade().map(|_| ()), + None => Some(()), + } + } + + /// Returns whether the dataflow is in the process of shutting down. + pub(super) fn in_shutdown(&self) -> bool { + self.probe().is_none() + } + + /// Returns a reference to the wrapped `Weak`. + pub(crate) fn get_inner(&self) -> Option<&Weak<()>> { + self.0.as_ref() + } +} + /// Describes flavor of arrangement: local or imported trace. #[derive(Clone)] pub enum ArrangementFlavor diff --git a/src/compute/src/render/errors.rs b/src/compute/src/render/errors.rs index fcc57067f0329..26d36d19fb857 100644 --- a/src/compute/src/render/errors.rs +++ b/src/compute/src/render/errors.rs @@ -10,12 +10,14 @@ //! Helpers for handling errors encountered by operators. use std::hash::Hash; -use std::rc::Weak; use differential_dataflow::ExchangeData; -use mz_repr::Row; use timely::container::columnation::Columnation; +use mz_repr::Row; + +use super::context::ShutdownToken; + /// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`, /// specialized for use in compute. Validation code will only run when the error constructor is /// Some. @@ -77,25 +79,18 @@ where /// the process of shutting down. #[derive(Clone)] pub(super) struct ErrorLogger { - token: Option>, + token: ShutdownToken, dataflow_name: String, } impl ErrorLogger { - pub fn new(token: Option>, dataflow_name: String) -> Self { + pub fn new(token: ShutdownToken, dataflow_name: String) -> Self { Self { token, dataflow_name, } } - fn token_alive(&self) -> bool { - match &self.token { - Some(t) => t.upgrade().is_some(), - None => true, - } - } - /// Log the given error, unless the dataflow is shutting down. /// /// The logging format is optimized for surfacing errors with Sentry: @@ -113,7 +108,7 @@ impl ErrorLogger { /// // TODO(#18214): Rethink or justify our error logging strategy. pub fn log(&self, message: &'static str, details: &str) { - if self.token_alive() { + if !self.token.in_shutdown() { self.log_always(message, details); } } diff --git a/src/compute/src/render/mod.rs b/src/compute/src/render/mod.rs index 72fe9baa9c5bf..1d1294c2cf43a 100644 --- a/src/compute/src/render/mod.rs +++ b/src/compute/src/render/mod.rs @@ -137,6 +137,8 @@ use crate::typedefs::{ErrSpine, RowKeySpine}; pub use context::CollectionBundle; use context::{ArrangementFlavor, Context}; +use self::context::ShutdownToken; + pub mod context; mod errors; mod flat_map; @@ -299,7 +301,7 @@ pub fn build_compute_dataflow( // Build declared objects. for object in dataflow.objects_to_build { let object_token = Rc::new(()); - context.shutdown_token = Some(Rc::downgrade(&object_token)); + context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token)); tokens.insert(object.id, object_token); let bundle = context.render_recursive_plan(0, object.plan); @@ -353,7 +355,7 @@ pub fn build_compute_dataflow( // Build declared objects. for object in dataflow.objects_to_build { let object_token = Rc::new(()); - context.shutdown_token = Some(Rc::downgrade(&object_token)); + context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token)); tokens.insert(object.id, object_token); context.build_object(object); @@ -651,7 +653,7 @@ where // Set oks variable to `oks` but consolidated to ensure iteration ceases at fixed point. let mut oks = oks.consolidate_named::>("LetRecConsolidation"); - if let Some(token) = &self.shutdown_token { + if let Some(token) = &self.shutdown_token.get_inner() { oks = oks.with_token(Weak::clone(token)); } oks_v.set(&oks); @@ -668,7 +670,7 @@ where move |_k, _s, t| t.push(((), 1)), ) .as_collection(|k, _| k.clone()); - if let Some(token) = &self.shutdown_token { + if let Some(token) = &self.shutdown_token.get_inner() { errs = errs.with_token(Weak::clone(token)); } err_v.set(&errs); From 00f11ab96462a1bfe2670a6a5ec5dd3e17024569 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 24 Apr 2023 16:13:18 +0200 Subject: [PATCH 2/2] compute: tokenize delta join closure This commit adds a shutdown token check to the closure we pass to the delta join operator. When the dataflow is shutting down, this makes the join closure drain all input data, rather than processing it. As a results, delta join operators shut down faster and emit less data, which in turn speeds up shutdown of downstream operators. --- src/compute/src/render/join/delta_join.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/compute/src/render/join/delta_join.rs b/src/compute/src/render/join/delta_join.rs index 8fc13b3ad562a..59291233d4ce5 100644 --- a/src/compute/src/render/join/delta_join.rs +++ b/src/compute/src/render/join/delta_join.rs @@ -12,6 +12,7 @@ //! Consult [DeltaJoinPlan] documentation for details. #![allow(clippy::op_ref)] + use std::collections::{BTreeMap, BTreeSet}; use timely::dataflow::Scope; @@ -24,7 +25,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena}; use mz_storage_client::types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; -use crate::render::context::{ArrangementFlavor, CollectionBundle, Context}; +use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken}; impl Context where @@ -198,6 +199,7 @@ where stream_thinning, |t1, t2| t1.le(t2), closure, + self.shutdown_token.clone(), ) } else { build_halfjoin( @@ -207,6 +209,7 @@ where stream_thinning, |t1, t2| t1.lt(t2), closure, + self.shutdown_token.clone(), ) } } @@ -219,6 +222,7 @@ where stream_thinning, |t1, t2| t1.le(t2), closure, + self.shutdown_token.clone(), ) } else { build_halfjoin( @@ -228,6 +232,7 @@ where stream_thinning, |t1, t2| t1.lt(t2), closure, + self.shutdown_token.clone(), ) } } @@ -314,6 +319,7 @@ fn build_halfjoin( prev_thinning: Vec, comparison: CF, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Collection, @@ -364,6 +370,10 @@ where |_timer, count| count > 1_000_000, // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use. move |key, stream_row, lookup_row, initial, time, diff1, diff2| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]); let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder); @@ -396,6 +406,10 @@ where |_timer, count| count > 1_000_000, // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use. move |key, stream_row, lookup_row, initial, time, diff1, diff2| { + // Check the shutdown token to avoid doing unnecessary work when the dataflow is + // shutting down. + shutdown_token.probe()?; + let temp_storage = RowArena::new(); let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]); let row = closure