diff --git a/Cargo.lock b/Cargo.lock index 4b3842469a1fa..6e69ae4208234 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3550,6 +3550,7 @@ dependencies = [ "prometheus", "scopeguard", "serde", + "smallvec", "timely", "tokio", "tracing", diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 41b32d9046b34..2e079b4300e74 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -32,6 +32,7 @@ once_cell = "1.16.0" prometheus = { version = "0.13.3", default-features = false } scopeguard = "1.1.0" serde = { version = "1.0.152", features = ["derive"] } +smallvec = { version = "1.10.0", features = ["serde", "union"] } timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] } tokio = { version = "1.23.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index 1763c8b0d464b..301524da937c8 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -11,6 +11,8 @@ //! //! Consult [TopKPlan] documentation for details. +use std::collections::HashMap; + use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::ArrangeBySelf; @@ -19,6 +21,8 @@ use differential_dataflow::operators::Consolidate; use differential_dataflow::trace::implementations::ord::OrdValSpine; use differential_dataflow::AsCollection; use differential_dataflow::Collection; +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::Operator; use timely::dataflow::Scope; use mz_compute_client::plan::top_k::{ @@ -56,6 +60,33 @@ where arity, limit, }) => { + let mut datum_vec = mz_repr::DatumVec::new(); + let collection = ok_input.map(move |row| { + let group_row = { + let datums = datum_vec.borrow_with(&row); + let iterator = group_key.iter().map(|i| datums[*i]); + let total_size = mz_repr::datums_size(iterator.clone()); + let mut group_row = Row::with_capacity(total_size); + group_row.packer().extend(iterator); + group_row + }; + (group_row, row) + }); + + // For monotonic inputs, we are able to thin the input relation in two stages: + // 1. First, we can do an intra-timestamp thinning which has the advantage of + // being computed in a streaming fashion, even for the initial snapshot. + // 2. Then, we can do inter-timestamp thinning by feeding back negations for + // any records that have been invalidated. + let collection = if let Some(limit) = limit { + render_intra_ts_thinning(collection, order_key.clone(), limit) + } else { + collection + }; + + let collection = + collection.map(|(group_row, row)| ((group_row, row.hashed()), row)); + // For monotonic inputs, we are able to retract inputs that can no longer be produced // as outputs. Any inputs beyond `offset + limit` will never again be produced as // outputs, and can be removed. The simplest form of this is when `offset == 0` and @@ -65,17 +96,24 @@ where // of `offset` and `limit`, discarding only the records not produced in the intermediate // stage. use differential_dataflow::operators::iterate::Variable; - let delay = std::time::Duration::from_nanos(10_000_000_000); + let delay = std::time::Duration::from_secs(10); let retractions = Variable::new( &mut ok_input.scope(), ::system_delay( delay.try_into().expect("must fit"), ), ); - let thinned = ok_input.concat(&retractions.negate()); - let result = build_topk(thinned, group_key, order_key, 0, limit, arity); - retractions.set(&ok_input.concat(&result.negate())); - result + let thinned = collection.concat(&retractions.negate()); + + // As an additional optimization, we can skip creating the full topk hierachy + // here since we now have an upper bound on the number records due to the + // intra-ts thinning. The maximum number of records per timestamp is + // (num_workers * limit), which we expect to be a small number and so we render + // a single topk stage. + let result = build_topk_stage(thinned, order_key, 1u64, 0, limit, arity); + retractions.set(&collection.concat(&result.negate())); + + result.map(|((_key, _hash), row)| row) } TopKPlan::Basic(BasicTopKPlan { group_key, @@ -317,6 +355,154 @@ where // TODO(#7331): Here we discard the arranged output. result.as_collection(|_k, v| v.clone()) } + + fn render_intra_ts_thinning( + collection: Collection, + order_key: Vec, + limit: usize, + ) -> Collection + where + G: Scope, + G::Timestamp: Lattice, + { + let mut aggregates = HashMap::new(); + let mut vector = Vec::new(); + collection + .inner + .unary_notify( + Pipeline, + "TopKIntraTimeThinning", + [], + move |input, output, notificator| { + while let Some((time, data)) = input.next() { + data.swap(&mut vector); + let agg_time = aggregates + .entry(time.time().clone()) + .or_insert_with(HashMap::new); + for ((grp_row, row), record_time, diff) in vector.drain(..) { + let monoid = monoids::Top1Monoid { + row, + order_key: order_key.clone(), + }; + let topk = agg_time.entry((grp_row, record_time)).or_insert_with( + move || { + topk_agg::TopKBatch::new( + limit.try_into().expect("must fit"), + ) + }, + ); + topk.update(monoid, diff); + } + notificator.notify_at(time.retain()); + } + + notificator.for_each(|time, _, _| { + if let Some(aggs) = aggregates.remove(time.time()) { + let mut session = output.session(&time); + for ((grp_row, record_time), topk) in aggs { + session.give_iterator(topk.into_iter().map(|(monoid, diff)| { + ((grp_row.clone(), monoid.row), record_time.clone(), diff) + })) + } + } + }); + }, + ) + .as_collection() + } + } +} + +/// Types for in-place intra-ts aggregation of monotonic streams. +pub mod topk_agg { + use differential_dataflow::consolidation; + use smallvec::SmallVec; + + // TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of + // that. It would be nice to find a way to reuse some or all of the code from there. + // + // Additionally, because we're calling into DD's consolidate method we are forced to work with + // the `Ord` trait which for the usage we do above means that we need to clone the `order_key` + // for each record. It would be nice to also remove the need for cloning that piece of data + pub struct TopKBatch { + updates: SmallVec<[(T, i64); 16]>, + clean: usize, + limit: i64, + } + + impl TopKBatch { + pub fn new(limit: i64) -> Self { + Self { + updates: SmallVec::new(), + clean: 0, + limit, + } + } + + /// Adds a new update, for `item` with `value`. + /// + /// This could be optimized to perform compaction when the number of "dirty" elements exceeds + /// half the length of the list, which would keep the total footprint within reasonable bounds + /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it + /// is worth paying without some experimentation. + #[inline] + pub fn update(&mut self, item: T, value: i64) { + self.updates.push((item, value)); + self.maintain_bounds(); + } + + /// Compact the internal representation. + /// + /// This method sort `self.updates` and consolidates elements with equal item, discarding + /// any whose accumulation is zero. It is optimized to only do this if the number of dirty + /// elements is non-zero. + #[inline] + pub fn compact(&mut self) { + if self.clean < self.updates.len() && self.updates.len() > 1 { + let len = consolidation::consolidate_slice(&mut self.updates); + self.updates.truncate(len); + + // We can now retain only the first K records and throw away everything else + let mut limit = self.limit; + self.updates.retain(|x| { + if limit > 0 { + limit -= x.1; + true + } else { + false + } + }); + // By the end of the loop above `limit` will be less than or equal to zero. The + // case where it goes negative is when the last record we retained had more copies + // than necessary. For this reason we need to do one final adjustment of the diff + // field of the last record so that the total sum of the diffs in the batch is K. + if let Some(item) = self.updates.last_mut() { + // We are subtracting the limit *negated*, therefore we are subtracting a value + // that is *greater* than or equal to zero, which represents the excess. + item.1 -= -limit; + } + } + self.clean = self.updates.len(); + } + + /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data. + /// This function tries to minimize work by only compacting if enough work has accumulated. + fn maintain_bounds(&mut self) { + // if we have more than 32 elements and at least half of them are not clean, compact + if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean { + self.compact() + } + } + } + + impl IntoIterator for TopKBatch { + type Item = (T, i64); + type IntoIter = smallvec::IntoIter<[(T, i64); 16]>; + + fn into_iter(mut self) -> Self::IntoIter { + self.compact(); + self.updates.into_iter() + } } } diff --git a/test/testdrive/top-k-monotonic.td b/test/testdrive/top-k-monotonic.td new file mode 100644 index 0000000000000..9e473a1a440c4 --- /dev/null +++ b/test/testdrive/top-k-monotonic.td @@ -0,0 +1,53 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test monotonicity analyses which derive from ENVELOPE NONE sources. +# Note that these only test the implementation for monotonic sources, +# they do not test that the analysis doesn't have false positives on +# non-monotonic sources. + +$ set non-dbz-schema={ + "type": "record", + "name": "cpx", + "fields": [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "long"} + ] + } + +$ kafka-create-topic topic=non-dbz-data + +$ kafka-ingest format=avro topic=non-dbz-data schema=${non-dbz-schema} timestamp=1 +{"a": 1, "b": 1} +{"a": 1, "b": 2} +{"a": 1, "b": 3} +{"a": 1, "b": 4} +{"a": 1, "b": 5} +{"a": 2, "b": 1000} +{"a": 2, "b": 1001} +{"a": 2, "b": 1002} +{"a": 2, "b": 1003} +{"a": 2, "b": 1004} + +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}'); + +> CREATE SOURCE non_dbz_data + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-dbz-data-${testdrive.seed}') + FORMAT AVRO USING SCHEMA '${non-dbz-schema}' + ENVELOPE NONE + +# Create a monotonic topk plan that has both a limit and a group to test that thinning works as expected +> SELECT * FROM (SELECT DISTINCT a FROM non_dbz_data) grp, LATERAL (SELECT b FROM non_dbz_data WHERE a = grp.a ORDER BY b LIMIT 2); +a b +--------- +1 1 +1 2 +2 1000 +2 1001