Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 124 additions & 38 deletions src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use timely::progress::timestamp::{Refines, Timestamp};

use crate::extensions::arrange::MzArrange;
use crate::render::context::{
ArrangementFlavor, CollectionBundle, Context, SpecializedArrangement,
ArrangementFlavor, CollectionBundle, Context, ShutdownToken, SpecializedArrangement,
SpecializedArrangementImport,
};
use crate::render::join::mz_join_core::mz_join_core;
Expand Down Expand Up @@ -72,6 +72,7 @@ impl LinearJoinSpec {
&self,
arranged1: &Arranged<G, Tr1>,
arranged2: &Arranged<G, Tr2>,
shutdown_token: ShutdownToken,
result: L,
) -> Collection<G, I::Item, Diff>
where
Expand All @@ -95,11 +96,11 @@ impl LinearJoinSpec {
}
(Materialize, ByWork(limit)) => {
let yield_fn = move |_start, work| work >= limit;
mz_join_core(arranged1, arranged2, result, yield_fn)
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
}
(Materialize, ByTime(limit)) => {
let yield_fn = move |start: Instant, _work| start.elapsed() >= limit;
mz_join_core(arranged1, arranged2, result, yield_fn)
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
}
}
}
Expand Down Expand Up @@ -198,6 +199,7 @@ where
inputs[stage_plan.lookup_relation].enter_region(inner),
stage_plan,
&mut errors,
self.shutdown_token.clone(),
self.enable_specialized_arrangements,
);
// Update joined results and capture any errors.
Expand Down Expand Up @@ -256,6 +258,7 @@ fn differential_join<G, T>(
lookup_relation: _,
}: LinearStagePlan,
errors: &mut Vec<Collection<G, DataflowError, Diff>>,
shutdown_token: ShutdownToken,
_enable_specialized_arrangements: bool,
) -> Collection<G, Row, Diff>
where
Expand Down Expand Up @@ -305,31 +308,51 @@ where
}
JoinedFlavor::Local(local) => match arrangement {
ArrangementFlavor::Local(oks, errs1) => {
let (oks, errs2) =
dispatch_differential_join_inner_local_local(join_spec, local, oks, closure);
let (oks, errs2) = dispatch_differential_join_inner_local_local(
join_spec,
local,
oks,
closure,
shutdown_token,
);
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
ArrangementFlavor::Trace(_gid, oks, errs1) => {
let (oks, errs2) =
dispatch_differential_join_inner_local_trace(join_spec, local, oks, closure);
let (oks, errs2) = dispatch_differential_join_inner_local_trace(
join_spec,
local,
oks,
closure,
shutdown_token,
);
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
},
JoinedFlavor::Trace(trace) => match arrangement {
ArrangementFlavor::Local(oks, errs1) => {
let (oks, errs2) =
dispatch_differential_join_inner_trace_local(join_spec, trace, oks, closure);
let (oks, errs2) = dispatch_differential_join_inner_trace_local(
join_spec,
trace,
oks,
closure,
shutdown_token,
);
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
ArrangementFlavor::Trace(_gid, oks, errs1) => {
let (oks, errs2) =
dispatch_differential_join_inner_trace_trace(join_spec, trace, oks, closure);
let (oks, errs2) = dispatch_differential_join_inner_trace_trace(
join_spec,
trace,
oks,
closure,
shutdown_token,
);
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
Expand All @@ -344,6 +367,7 @@ fn dispatch_differential_join_inner_local_local<G>(
prev_keyed: SpecializedArrangement<G>,
next_input: SpecializedArrangement<G>,
closure: JoinClosure,
shutdown_token: ShutdownToken,
) -> (
Collection<G, Row, Diff>,
Option<Collection<G, DataflowError, Diff>>,
Expand All @@ -364,6 +388,7 @@ where
Some(vec![]),
Some(vec![]),
closure,
shutdown_token,
),
(
SpecializedArrangement::RowUnit(prev_keyed),
Expand All @@ -376,6 +401,7 @@ where
Some(vec![]),
None,
closure,
shutdown_token,
),
(
SpecializedArrangement::RowRow(prev_keyed),
Expand All @@ -388,11 +414,21 @@ where
None,
Some(vec![]),
closure,
shutdown_token,
),
(
SpecializedArrangement::RowRow(prev_keyed),
SpecializedArrangement::RowRow(next_input),
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
) => differential_join_inner(
join_spec,
prev_keyed,
next_input,
None,
None,
None,
closure,
shutdown_token,
),
}
}

Expand All @@ -402,6 +438,7 @@ fn dispatch_differential_join_inner_local_trace<G, T>(
prev_keyed: SpecializedArrangement<G>,
next_input: SpecializedArrangementImport<G, T>,
closure: JoinClosure,
shutdown_token: ShutdownToken,
) -> (
Collection<G, Row, Diff>,
Option<Collection<G, DataflowError, Diff>>,
Expand All @@ -423,6 +460,7 @@ where
Some(vec![]),
Some(vec![]),
closure,
shutdown_token,
),
(
SpecializedArrangement::RowUnit(prev_keyed),
Expand All @@ -435,6 +473,7 @@ where
Some(vec![]),
None,
closure,
shutdown_token,
),
(
SpecializedArrangement::RowRow(prev_keyed),
Expand All @@ -447,11 +486,21 @@ where
None,
Some(vec![]),
closure,
shutdown_token,
),
(
SpecializedArrangement::RowRow(prev_keyed),
SpecializedArrangementImport::RowRow(next_input),
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
) => differential_join_inner(
join_spec,
prev_keyed,
next_input,
None,
None,
None,
closure,
shutdown_token,
),
}
}

Expand All @@ -461,6 +510,7 @@ fn dispatch_differential_join_inner_trace_local<G, T>(
prev_keyed: SpecializedArrangementImport<G, T>,
next_input: SpecializedArrangement<G>,
closure: JoinClosure,
shutdown_token: ShutdownToken,
) -> (
Collection<G, Row, Diff>,
Option<Collection<G, DataflowError, Diff>>,
Expand All @@ -482,6 +532,7 @@ where
Some(vec![]),
Some(vec![]),
closure,
shutdown_token,
),
(
SpecializedArrangementImport::RowUnit(prev_keyed),
Expand All @@ -494,6 +545,7 @@ where
Some(vec![]),
None,
closure,
shutdown_token,
),
(
SpecializedArrangementImport::RowRow(prev_keyed),
Expand All @@ -506,11 +558,21 @@ where
None,
Some(vec![]),
closure,
shutdown_token,
),
(
SpecializedArrangementImport::RowRow(prev_keyed),
SpecializedArrangement::RowRow(next_input),
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
) => differential_join_inner(
join_spec,
prev_keyed,
next_input,
None,
None,
None,
closure,
shutdown_token,
),
}
}

Expand All @@ -520,6 +582,7 @@ fn dispatch_differential_join_inner_trace_trace<G, T>(
prev_keyed: SpecializedArrangementImport<G, T>,
next_input: SpecializedArrangementImport<G, T>,
closure: JoinClosure,
shutdown_token: ShutdownToken,
) -> (
Collection<G, Row, Diff>,
Option<Collection<G, DataflowError, Diff>>,
Expand All @@ -541,6 +604,7 @@ where
Some(vec![]),
Some(vec![]),
closure,
shutdown_token,
),
(
SpecializedArrangementImport::RowUnit(prev_keyed),
Expand All @@ -553,6 +617,7 @@ where
Some(vec![]),
None,
closure,
shutdown_token,
),
(
SpecializedArrangementImport::RowRow(prev_keyed),
Expand All @@ -565,11 +630,21 @@ where
None,
Some(vec![]),
closure,
shutdown_token,
),
(
SpecializedArrangementImport::RowRow(prev_keyed),
SpecializedArrangementImport::RowRow(next_input),
) => differential_join_inner(join_spec, prev_keyed, next_input, None, None, None, closure),
) => differential_join_inner(
join_spec,
prev_keyed,
next_input,
None,
None,
None,
closure,
shutdown_token,
),
}
}

Expand All @@ -587,6 +662,7 @@ fn differential_join_inner<G, T, Tr1, Tr2, K, V1, V2>(
prev_types: Option<Vec<ColumnType>>,
next_types: Option<Vec<ColumnType>>,
closure: JoinClosure,
shutdown_token: ShutdownToken,
) -> (
Collection<G, Row, Diff>,
Option<Collection<G, DataflowError, Diff>>,
Expand All @@ -611,18 +687,23 @@ where

if closure.could_error() {
let (oks, err) = join_spec
.render(&prev_keyed, &next_input, move |key, old, new| {
let key = key.into_row(&mut key_buf, key_types.as_deref());
let old = old.into_row(&mut old_buf, prev_types.as_deref());
let new = new.into_row(&mut new_buf, next_types.as_deref());

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.map_err(DataflowError::from)
.transpose()
})
.render(
&prev_keyed,
&next_input,
shutdown_token,
move |key, old, new| {
let key = key.into_row(&mut key_buf, key_types.as_deref());
let old = old.into_row(&mut old_buf, prev_types.as_deref());
let new = new.into_row(&mut new_buf, next_types.as_deref());

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.map_err(DataflowError::from)
.transpose()
},
)
.inner
.ok_err(|(x, t, d)| {
// TODO(mcsherry): consider `ok_err()` for `Collection`.
Expand All @@ -634,17 +715,22 @@ where

(oks.as_collection(), Some(err.as_collection()))
} else {
let oks = join_spec.render(&prev_keyed, &next_input, move |key, old, new| {
let key = key.into_row(&mut key_buf, key_types.as_deref());
let old = old.into_row(&mut old_buf, prev_types.as_deref());
let new = new.into_row(&mut new_buf, next_types.as_deref());

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.expect("Closure claimed to never error")
});
let oks = join_spec.render(
&prev_keyed,
&next_input,
shutdown_token,
move |key, old, new| {
let key = key.into_row(&mut key_buf, key_types.as_deref());
let old = old.into_row(&mut old_buf, prev_types.as_deref());
let new = new.into_row(&mut new_buf, next_types.as_deref());

let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.expect("Closure claimed to never error")
},
);

(oks, None)
}
Expand Down
Loading