diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index 9e32a3d112566..f40fc7f965d57 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -30,7 +30,7 @@ use timely::progress::timestamp::{Refines, Timestamp}; use crate::extensions::arrange::MzArrange; use crate::render::context::{ - ArrangementFlavor, CollectionBundle, Context, SpecializedArrangement, + ArrangementFlavor, CollectionBundle, Context, ShutdownToken, SpecializedArrangement, SpecializedArrangementImport, }; use crate::render::join::mz_join_core::mz_join_core; @@ -72,6 +72,7 @@ impl LinearJoinSpec { &self, arranged1: &Arranged, arranged2: &Arranged, + shutdown_token: ShutdownToken, result: L, ) -> Collection where @@ -95,11 +96,11 @@ impl LinearJoinSpec { } (Materialize, ByWork(limit)) => { let yield_fn = move |_start, work| work >= limit; - mz_join_core(arranged1, arranged2, result, yield_fn) + mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) } (Materialize, ByTime(limit)) => { let yield_fn = move |start: Instant, _work| start.elapsed() >= limit; - mz_join_core(arranged1, arranged2, result, yield_fn) + mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) } } } @@ -198,6 +199,7 @@ where inputs[stage_plan.lookup_relation].enter_region(inner), stage_plan, &mut errors, + self.shutdown_token.clone(), self.enable_specialized_arrangements, ); // Update joined results and capture any errors. @@ -256,6 +258,7 @@ fn differential_join( lookup_relation: _, }: LinearStagePlan, errors: &mut Vec>, + shutdown_token: ShutdownToken, _enable_specialized_arrangements: bool, ) -> Collection where @@ -305,15 +308,25 @@ where } JoinedFlavor::Local(local) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = - dispatch_differential_join_inner_local_local(join_spec, local, oks, closure); + let (oks, errs2) = dispatch_differential_join_inner_local_local( + join_spec, + local, + oks, + closure, + shutdown_token, + ); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = - dispatch_differential_join_inner_local_trace(join_spec, local, oks, closure); + let (oks, errs2) = dispatch_differential_join_inner_local_trace( + join_spec, + local, + oks, + closure, + shutdown_token, + ); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -321,15 +334,25 @@ where }, JoinedFlavor::Trace(trace) => match arrangement { ArrangementFlavor::Local(oks, errs1) => { - let (oks, errs2) = - dispatch_differential_join_inner_trace_local(join_spec, trace, oks, closure); + let (oks, errs2) = dispatch_differential_join_inner_trace_local( + join_spec, + trace, + oks, + closure, + shutdown_token, + ); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks } ArrangementFlavor::Trace(_gid, oks, errs1) => { - let (oks, errs2) = - dispatch_differential_join_inner_trace_trace(join_spec, trace, oks, closure); + let (oks, errs2) = dispatch_differential_join_inner_trace_trace( + join_spec, + trace, + oks, + closure, + shutdown_token, + ); errors.push(errs1.as_collection(|k, _v| k.clone())); errors.extend(errs2); oks @@ -344,6 +367,7 @@ fn dispatch_differential_join_inner_local_local( prev_keyed: SpecializedArrangement, next_input: SpecializedArrangement, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -364,6 +388,7 @@ where Some(vec![]), Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangement::RowUnit(prev_keyed), @@ -376,6 +401,7 @@ where Some(vec![]), None, closure, + shutdown_token, ), ( SpecializedArrangement::RowRow(prev_keyed), @@ -388,11 +414,21 @@ where None, Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangement::RowRow(prev_keyed), SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner( + join_spec, + prev_keyed, + next_input, + None, + None, + None, + closure, + shutdown_token, + ), } } @@ -402,6 +438,7 @@ fn dispatch_differential_join_inner_local_trace( prev_keyed: SpecializedArrangement, next_input: SpecializedArrangementImport, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -423,6 +460,7 @@ where Some(vec![]), Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangement::RowUnit(prev_keyed), @@ -435,6 +473,7 @@ where Some(vec![]), None, closure, + shutdown_token, ), ( SpecializedArrangement::RowRow(prev_keyed), @@ -447,11 +486,21 @@ where None, Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangement::RowRow(prev_keyed), SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner( + join_spec, + prev_keyed, + next_input, + None, + None, + None, + closure, + shutdown_token, + ), } } @@ -461,6 +510,7 @@ fn dispatch_differential_join_inner_trace_local( prev_keyed: SpecializedArrangementImport, next_input: SpecializedArrangement, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -482,6 +532,7 @@ where Some(vec![]), Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangementImport::RowUnit(prev_keyed), @@ -494,6 +545,7 @@ where Some(vec![]), None, closure, + shutdown_token, ), ( SpecializedArrangementImport::RowRow(prev_keyed), @@ -506,11 +558,21 @@ where None, Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangementImport::RowRow(prev_keyed), SpecializedArrangement::RowRow(next_input), - ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner( + join_spec, + prev_keyed, + next_input, + None, + None, + None, + closure, + shutdown_token, + ), } } @@ -520,6 +582,7 @@ fn dispatch_differential_join_inner_trace_trace( prev_keyed: SpecializedArrangementImport, next_input: SpecializedArrangementImport, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -541,6 +604,7 @@ where Some(vec![]), Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangementImport::RowUnit(prev_keyed), @@ -553,6 +617,7 @@ where Some(vec![]), None, closure, + shutdown_token, ), ( SpecializedArrangementImport::RowRow(prev_keyed), @@ -565,11 +630,21 @@ where None, Some(vec![]), closure, + shutdown_token, ), ( SpecializedArrangementImport::RowRow(prev_keyed), SpecializedArrangementImport::RowRow(next_input), - ) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure), + ) => differential_join_inner( + join_spec, + prev_keyed, + next_input, + None, + None, + None, + closure, + shutdown_token, + ), } } @@ -587,6 +662,7 @@ fn differential_join_inner( prev_types: Option>, next_types: Option>, closure: JoinClosure, + shutdown_token: ShutdownToken, ) -> ( Collection, Option>, @@ -611,18 +687,23 @@ where if closure.could_error() { let (oks, err) = join_spec - .render(&prev_keyed, &next_input, move |key, old, new| { - let key = key.into_row(&mut key_buf, key_types.as_deref()); - let old = old.into_row(&mut old_buf, prev_types.as_deref()); - let new = new.into_row(&mut new_buf, next_types.as_deref()); - - let temp_storage = RowArena::new(); - let mut datums_local = datums.borrow_with_many(&[key, old, new]); - closure - .apply(&mut datums_local, &temp_storage, &mut row_builder) - .map_err(DataflowError::from) - .transpose() - }) + .render( + &prev_keyed, + &next_input, + shutdown_token, + move |key, old, new| { + let key = key.into_row(&mut key_buf, key_types.as_deref()); + let old = old.into_row(&mut old_buf, prev_types.as_deref()); + let new = new.into_row(&mut new_buf, next_types.as_deref()); + + let temp_storage = RowArena::new(); + let mut datums_local = datums.borrow_with_many(&[key, old, new]); + closure + .apply(&mut datums_local, &temp_storage, &mut row_builder) + .map_err(DataflowError::from) + .transpose() + }, + ) .inner .ok_err(|(x, t, d)| { // TODO(mcsherry): consider `ok_err()` for `Collection`. @@ -634,17 +715,22 @@ where (oks.as_collection(), Some(err.as_collection())) } else { - let oks = join_spec.render(&prev_keyed, &next_input, move |key, old, new| { - let key = key.into_row(&mut key_buf, key_types.as_deref()); - let old = old.into_row(&mut old_buf, prev_types.as_deref()); - let new = new.into_row(&mut new_buf, next_types.as_deref()); - - let temp_storage = RowArena::new(); - let mut datums_local = datums.borrow_with_many(&[key, old, new]); - closure - .apply(&mut datums_local, &temp_storage, &mut row_builder) - .expect("Closure claimed to never error") - }); + let oks = join_spec.render( + &prev_keyed, + &next_input, + shutdown_token, + move |key, old, new| { + let key = key.into_row(&mut key_buf, key_types.as_deref()); + let old = old.into_row(&mut old_buf, prev_types.as_deref()); + let new = new.into_row(&mut new_buf, next_types.as_deref()); + + let temp_storage = RowArena::new(); + let mut datums_local = datums.borrow_with_many(&[key, old, new]); + closure + .apply(&mut datums_local, &temp_storage, &mut row_builder) + .expect("Closure claimed to never error") + }, + ); (oks, None) } diff --git a/src/compute/src/render/join/mz_join_core.rs b/src/compute/src/render/join/mz_join_core.rs index 95e3f6398e630..8d9a091c080e2 100644 --- a/src/compute/src/render/join/mz_join_core.rs +++ b/src/compute/src/render/join/mz_join_core.rs @@ -57,6 +57,8 @@ use timely::progress::timestamp::Timestamp; use timely::scheduling::Activator; use timely::PartialOrder; +use crate::render::context::ShutdownToken; + /// Joins two arranged collections with the same key type. /// /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, @@ -65,6 +67,7 @@ use timely::PartialOrder; pub(super) fn mz_join_core( arranged1: &Arranged, arranged2: &Arranged, + shutdown_token: ShutdownToken, mut result: L, yield_fn: YFn, ) -> Collection @@ -175,6 +178,23 @@ where let mut input2_buffer = Vec::new(); move |input1, input2, output| { + // If the dataflow is shutting down, discard all existing and future work. + if shutdown_token.in_shutdown() { + // Discard data at the inputs. + input1.for_each(|_cap, _data| ()); + input2.for_each(|_cap, _data| ()); + + // Discard queued work. + todo1 = Default::default(); + todo2 = Default::default(); + + // Stop holding on to input traces. + trace1_option = None; + trace2_option = None; + + return; + } + // 1. Consuming input. // // The join computation repeatedly accepts batches of updates from each of its inputs.