From 4c7e2409047e9efac0aa499e0fa9eeee35ce1746 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Mon, 17 Jul 2023 15:16:40 +0200 Subject: [PATCH] compute: tokenize mz_join_core This commit adds a shutdown token check to the `mz_join_core` linear join implementation. When the dataflow is shutting down, this makes the operator discard all its existing work and new input data, rather than processing it. As a result, differential join operators shut down faster and emit less data, which in turn speeds up shutdown of downstream operators. Unfortunately, we can't make the same change for the DD join operator. We could add a token check into the result closure we pass to that operator, but the shutdown check would interfere with the fueling of the DD join operator. Fuel is consumed based on the number of updates emitted. When the token is dropped, the join closure stops producing updates, which means the operator stops consuming fuel, so it does not yield anymore until it has drained all its inputs. If there are many inputs left, the replica may not accept commands for potentially quite a long time. --- src/compute/src/render/join/linear_join.rs | 162 +++++++++++++++----- src/compute/src/render/join/mz_join_core.rs | 20 +++ 2 files changed, 144 insertions(+), 38 deletions(-) diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index 9e32a3d112566..f40fc7f965d57 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -30,7 +30,7 @@ use timely::progress::timestamp::{Refines, Timestamp}; use crate::extensions::arrange::MzArrange; use crate::render::context::{ - ArrangementFlavor, CollectionBundle, Context, SpecializedArrangement, + ArrangementFlavor, CollectionBundle, Context, ShutdownToken, SpecializedArrangement, SpecializedArrangementImport, }; use crate::render::join::mz_join_core::mz_join_core; @@ -72,6 +72,7 @@ impl LinearJoinSpec { &self, arranged1: &Arranged, arranged2: &Arranged, + shutdown_token: ShutdownToken, result: L, ) -> Collection where @@ -95,11 +96,11 @@ impl LinearJoinSpec { } (Materialize, ByWork(limit)) => { let yield_fn = move |_start, work| work >= limit; - mz_join_core(arranged1, arranged2, result, yield_fn) + mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) } (Materialize, ByTime(limit)) => { let yield_fn = move |start: Instant, _work| start.elapsed() >= limit; - mz_join_core(arranged1, arranged2, result, yield_fn) + mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) } } } @@ -198,6 +199,7 @@ where inputs[stage_plan.lookup_relation].enter_region(inner), stage_plan, &mut errors, + self.shutdown_token.clone(), self.enable_specialized_arrangements, ); // Update joined results and capture any errors. @@ -256,6 +258,7 @@ fn differential_join( lookup_relation: _, }: LinearStagePlan, errors: &mut Vec>, + shutdown_token: ShutdownToken, _enable_specialized_arrangements: bool, ) -> Collection where @@ -305,15 +308,25 @@ where } JoinedFlavor::Local(local) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = - dispatch_differential_join_inner_local_local(join_spec, local, oks, closure); + let (oks, errs2) = dispatch_differential_join_inner_local_local( + join_spec, + local, + oks, + closure, + shutdown_token, + ); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = - dispatch_differential_join_inner_local_trace(join_spec, local, oks, closure); + let (oks, errs2) = dispatch_differential_join_inner_local_trace( + join_spec, + local, + oks, + closure, + shutdown_token, + ); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -321,15 +334,25 @@ where }, JoinedFlavor::Trace(trace) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = - dispatch_differential_join_inner_trace_local(join_spec, trace, oks, closure); + let (oks, errs2) = dispatch_differential_join_inner_trace_local( + join_spec, + trace, + oks, + closure, + shutdown_token, + ); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = - dispatch_differential_join_inner_trace_trace(join_spec, trace, oks, closure); + let (oks, errs2) = dispatch_differential_join_inner_trace_trace( + join_spec, + trace, + oks, + closure, + shutdown_token, + ); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -344,6 +367,7 @@ fn dispatch_differential_join_inner_local_local( prev_keyed: SpecializedArrangement, next_input: SpecializedArrangement, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -364,6 +388,7 @@ where Some(vec![]), Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangement::RowUnit(prev_keyed), @@ -376,6 +401,7 @@ where Some(vec![]), None, closure, + shutdown_token, ), ( SpecializedArrangement::RowRow(prev_keyed), @@ -388,11 +414,21 @@ where None, Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangement::RowRow(prev_keyed), SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner( + join_spec, + prev_keyed, + next_input, + None, + None, + None, + closure, + shutdown_token, + ), } } @@ -402,6 +438,7 @@ fn dispatch_differential_join_inner_local_trace( prev_keyed: SpecializedArrangement, next_input: SpecializedArrangementImport, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -423,6 +460,7 @@ where Some(vec![]), Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangement::RowUnit(prev_keyed), @@ -435,6 +473,7 @@ where Some(vec![]), None, closure, + shutdown_token, ), ( SpecializedArrangement::RowRow(prev_keyed), @@ -447,11 +486,21 @@ where None, Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangement::RowRow(prev_keyed), SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner( + join_spec, + prev_keyed, + next_input, + None, + None, + None, + closure, + shutdown_token, + ), } } @@ -461,6 +510,7 @@ fn dispatch_differential_join_inner_trace_local( prev_keyed: SpecializedArrangementImport, next_input: SpecializedArrangement, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -482,6 +532,7 @@ where Some(vec![]), Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangementImport::RowUnit(prev_keyed), @@ -494,6 +545,7 @@ where Some(vec![]), None, closure, + shutdown_token, ), ( SpecializedArrangementImport::RowRow(prev_keyed), @@ -506,11 +558,21 @@ where None, Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangementImport::RowRow(prev_keyed), SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner( + join_spec, + prev_keyed, + next_input, + None, + None, + None, + closure, + shutdown_token, + ), } } @@ -520,6 +582,7 @@ fn dispatch_differential_join_inner_trace_trace( prev_keyed: SpecializedArrangementImport, next_input: SpecializedArrangementImport, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -541,6 +604,7 @@ where Some(vec![]), Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangementImport::RowUnit(prev_keyed), @@ -553,6 +617,7 @@ where Some(vec![]), None, closure, + shutdown_token, ), ( SpecializedArrangementImport::RowRow(prev_keyed), @@ -565,11 +630,21 @@ where None, Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangementImport::RowRow(prev_keyed), SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner( + join_spec, + prev_keyed, + next_input, + None, + None, + None, + closure, + shutdown_token, + ), } } @@ -587,6 +662,7 @@ fn differential_join_inner( prev_types: Option>, next_types: Option>, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -611,18 +687,23 @@ where if closure.could_error() { let (oks, err) = join_spec - .render(&prev_keyed, &next_input, move |key, old, new| { - let key = key.into_row(&mut key_buf, key_types.as_deref()); - let old = old.into_row(&mut old_buf, prev_types.as_deref()); - let new = new.into_row(&mut new_buf, next_types.as_deref()); - - let temp_storage = RowArena::new(); - let mut datums_local = datums.borrow_with_many(&[key, old, new]); - closure - .apply(&mut datums_local, &temp_storage, &mut row_builder) - .map_err(DataflowError::from) - .transpose() - }) + .render( + &prev_keyed, + &next_input, + shutdown_token, + move |key, old, new| { + let key = key.into_row(&mut key_buf, key_types.as_deref()); + let old = old.into_row(&mut old_buf, prev_types.as_deref()); + let new = new.into_row(&mut new_buf, next_types.as_deref()); + + let temp_storage = RowArena::new(); + let mut datums_local = datums.borrow_with_many(&[key, old, new]); + closure + .apply(&mut datums_local, &temp_storage, &mut row_builder) + .map_err(DataflowError::from) + .transpose() + }, + ) .inner .ok_err(|(x, t, d)| { // TODO(mcsherry): consider `ok_err()` for `Collection`. @@ -634,17 +715,22 @@ where (oks.as_collection(), Some(err.as_collection())) } else { - let oks = join_spec.render(&prev_keyed, &next_input, move |key, old, new| { - let key = key.into_row(&mut key_buf, key_types.as_deref()); - let old = old.into_row(&mut old_buf, prev_types.as_deref()); - let new = new.into_row(&mut new_buf, next_types.as_deref()); - - let temp_storage = RowArena::new(); - let mut datums_local = datums.borrow_with_many(&[key, old, new]); - closure - .apply(&mut datums_local, &temp_storage, &mut row_builder) - .expect("Closure claimed to never error") - }); + let oks = join_spec.render( + &prev_keyed, + &next_input, + shutdown_token, + move |key, old, new| { + let key = key.into_row(&mut key_buf, key_types.as_deref()); + let old = old.into_row(&mut old_buf, prev_types.as_deref()); + let new = new.into_row(&mut new_buf, next_types.as_deref()); + + let temp_storage = RowArena::new(); + let mut datums_local = datums.borrow_with_many(&[key, old, new]); + closure + .apply(&mut datums_local, &temp_storage, &mut row_builder) + .expect("Closure claimed to never error") + }, + ); (oks, None) } diff --git a/src/compute/src/render/join/mz_join_core.rs b/src/compute/src/render/join/mz_join_core.rs index 95e3f6398e630..8d9a091c080e2 100644 --- a/src/compute/src/render/join/mz_join_core.rs +++ b/src/compute/src/render/join/mz_join_core.rs @@ -57,6 +57,8 @@ use timely::progress::timestamp::Timestamp; use timely::scheduling::Activator; use timely::PartialOrder; +use crate::render::context::ShutdownToken; + /// Joins two arranged collections with the same key type. /// /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, @@ -65,6 +67,7 @@ use timely::PartialOrder; pub(super) fn mz_join_core( arranged1: &Arranged, arranged2: &Arranged, + shutdown_token: ShutdownToken, mut result: L, yield_fn: YFn, ) -> Collection @@ -175,6 +178,23 @@ where let mut input2_buffer = Vec::new(); move |input1, input2, output| { + // If the dataflow is shutting down, discard all existing and future work. + if shutdown_token.in_shutdown() { + // Discard data at the inputs. + input1.for_each(|_cap, _data| ()); + input2.for_each(|_cap, _data| ()); + + // Discard queued work. + todo1 = Default::default(); + todo2 = Default::default(); + + // Stop holding on to input traces. + trace1_option = None; + trace2_option = None; + + return; + } + // 1. Consuming input. // // The join computation repeatedly accepts batches of updates from each of its inputs.