diff --git a/differential-dataflow/examples/arrange.rs b/differential-dataflow/examples/arrange.rs index a49136e2d..db7a876bc 100644 --- a/differential-dataflow/examples/arrange.rs +++ b/differential-dataflow/examples/arrange.rs @@ -6,7 +6,6 @@ use timely::scheduling::Scheduler; use differential_dataflow::input::Input; use differential_dataflow::AsCollection; -use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::join::JoinCore; use differential_dataflow::operators::Iterate; diff --git a/differential-dataflow/examples/cursors.rs b/differential-dataflow/examples/cursors.rs index 032318acc..4855ae7ef 100644 --- a/differential-dataflow/examples/cursors.rs +++ b/differential-dataflow/examples/cursors.rs @@ -38,7 +38,6 @@ use timely::progress::frontier::AntichainRef; use timely::dataflow::operators::Probe; use differential_dataflow::input::Input; -use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::TraceReader; diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 668836600..0a2e95663 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -10,7 +10,6 @@ use timely::dataflow::scopes::ScopeParent; use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; use differential_dataflow::input::{Input, InputSession}; -use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; use differential_dataflow::operators::iterate::VecVariable; type Node = usize; diff --git a/differential-dataflow/examples/interpreted.rs b/differential-dataflow/examples/interpreted.rs index 89d8ac046..a0134d7e2 100644 --- a/differential-dataflow/examples/interpreted.rs +++ b/differential-dataflow/examples/interpreted.rs @@ -6,9 +6,6 @@ use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::ArrangeBySelf; -use differential_dataflow::operators::arrange::ArrangeByKey; - use graph_map::GraphMMap; type Node = u32; diff --git a/differential-dataflow/examples/itembased_cf.rs b/differential-dataflow/examples/itembased_cf.rs index 45859f23f..0dd05eb1b 100644 --- a/differential-dataflow/examples/itembased_cf.rs +++ b/differential-dataflow/examples/itembased_cf.rs @@ -1,6 +1,5 @@ use differential_dataflow::input::InputSession; use differential_dataflow::operators::{Join,CountTotal}; -use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::join::JoinCore; use rand::{Rng, SeedableRng, StdRng}; diff --git a/differential-dataflow/examples/multitemporal.rs b/differential-dataflow/examples/multitemporal.rs index 84f48ed22..8a0140ae7 100644 --- a/differential-dataflow/examples/multitemporal.rs +++ b/differential-dataflow/examples/multitemporal.rs @@ -7,7 +7,6 @@ use timely::progress::frontier::AntichainRef; use timely::PartialOrder; use differential_dataflow::AsCollection; -use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::trace::{Cursor, TraceReader}; use pair::Pair; diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index 39703891b..baadde341 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -14,7 +14,6 @@ where G: Scope, N: ExchangeData+Hash, { - use crate::operators::arrange::arrangement::ArrangeByKey; let edges = edges.arrange_by_key(); bfs_arranged(&edges, roots) } diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index 3464e4daf..387fdbe20 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -25,7 +25,6 @@ where G: Scope, N: ExchangeData+Hash, { - use crate::operators::arrange::arrangement::ArrangeByKey; let forward = edges.arrange_by_key(); let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key(); bidijkstra_arranged(&forward, &reverse, goals) diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 24d4b06cf..635bb38e9 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -7,7 +7,6 @@ use timely::dataflow::*; use crate::{VecCollection, ExchangeData}; use crate::lattice::Lattice; use crate::difference::{Abelian, Multiply}; -use crate::operators::arrange::arrangement::ArrangeByKey; /// Propagates labels forward, retaining the minimum label. /// diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 7164176e7..6b0c53962 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -746,7 +746,6 @@ pub mod vec { /// As `reduce` with the ability to name the operator. pub fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - use crate::operators::arrange::arrangement::ArrangeByKey; use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -803,7 +802,6 @@ pub mod vec { Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { - use crate::operators::arrange::arrangement::ArrangeByKey; self.arrange_by_key_named(&format!("Arrange: {}", name)) .reduce_core::<_,Bu,_>(name, logic) } @@ -867,7 +865,6 @@ pub mod vec { /// A `threshold` with the ability to name the operator. pub fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - use crate::operators::arrange::arrangement::ArrangeBySelf; use crate::trace::implementations::{KeyBuilder, KeySpine}; self.arrange_by_self_named(&format!("Arrange: {}", name)) @@ -906,7 +903,6 @@ pub mod vec { /// type is something other than an `isize` integer, for example perhaps an /// `i32`. pub fn count_core + 'static>(&self) -> Collection { - use crate::operators::arrange::arrangement::ArrangeBySelf; use crate::trace::implementations::{ValBuilder, ValSpine}; self.arrange_by_self_named("Arrange: Count") .reduce_abelian::<_,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -914,6 +910,174 @@ pub mod vec { } } + /// Methods which require data be arrangeable. + impl Collection + where + G: Scope, + D: crate::ExchangeData+Hashable, + R: crate::ExchangeData+Semigroup, + { + /// Aggregates the weights of equal records into at most one record. + /// + /// This method uses the type `D`'s `hashed()` method to partition the data. The data are + /// accumulated in place, each held back until their timestamp has completed. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(1 .. 10u32).1; + /// + /// x.negate() + /// .concat(&x) + /// .consolidate() // <-- ensures cancellation occurs + /// .assert_empty(); + /// }); + /// ``` + pub fn consolidate(&self) -> Self { + use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; + self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone()) + } + + /// As `consolidate` but with the ability to name the operator, specify the trace type, + /// and provide the function `reify` to produce owned keys and values.. + pub fn consolidate_named(&self, name: &str, reify: F) -> Self + where + Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, + Tr: for<'a> crate::trace::Trace+'static, + Bu: crate::trace::Builder, + F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, + { + use crate::operators::arrange::arrangement::Arrange; + self.map(|k| (k, ())) + .arrange_named::(name) + .as_collection(reify) + } + + /// Aggregates the weights of equal records. + /// + /// Unlike `consolidate`, this method does not exchange data and does not + /// ensure that at most one copy of each `(data, time)` pair exists in the + /// results. Instead, it acts on each batch of data and collapses equivalent + /// `(data, time)` pairs found therein, suppressing any that accumulate to + /// zero. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(1 .. 10u32).1; + /// + /// // nothing to assert, as no particular guarantees. + /// x.negate() + /// .concat(&x) + /// .consolidate_stream(); + /// }); + /// ``` + pub fn consolidate_stream(&self) -> Self { + + use timely::dataflow::channels::pact::Pipeline; + use timely::dataflow::operators::Operator; + use crate::collection::AsCollection; + use crate::consolidation::ConsolidatingContainerBuilder; + + self.inner + .unary::, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| { + + move |input, output| { + input.for_each(|time, data| { + output.session_with_builder(&time).give_iterator(data.drain(..)); + }) + } + }) + .as_collection() + } + } + + use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder}; + use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder}; + use crate::operators::arrange::Arrange; + + impl Arrange> for Collection + where + G: Scope, + K: crate::ExchangeData + Hashable, + V: crate::ExchangeData, + R: crate::ExchangeData + Semigroup, + { + fn arrange_named(&self, name: &str) -> Arranged> + where + Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, + Bu: crate::trace::Builder, + Tr: crate::trace::Trace + 'static, + { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name) + } + } + + impl Arrange> for Collection + where + G: Scope, + { + fn arrange_named(&self, name: &str) -> Arranged> + where + Ba: crate::trace::Batcher, Time=G::Timestamp> + 'static, + Bu: crate::trace::Builder, + Tr: crate::trace::Trace + 'static, + { + let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); + crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) + } + } + + + impl Collection + where + G: Scope, + { + /// Arranges a collection of `(Key, Val)` records by `Key`. + /// + /// This operator arranges a stream of values into a shared trace, whose contents it maintains. + /// This trace is current for all times completed by the output stream, which can be used to + /// safely identify the stable times and values in the trace. + pub fn arrange_by_key(&self) -> Arranged>> { + self.arrange_by_key_named("ArrangeByKey") + } + + /// As `arrange_by_key` but with the ability to name the arrangement. + pub fn arrange_by_key_named(&self, name: &str) -> Arranged>> { + self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) + } + } + + impl Collection + where + G: Scope, + { + /// Arranges a collection of `Key` records by `Key`. + /// + /// This operator arranges a collection of records into a shared trace, whose contents it maintains. + /// This trace is current for all times complete in the output stream, which can be used to safely + /// identify the stable times and values in the trace. + pub fn arrange_by_self(&self) -> Arranged>> { + self.arrange_by_self_named("ArrangeBySelf") + } + + /// As `arrange_by_self` but with the ability to name the arrangement. + pub fn arrange_by_self_named(&self, name: &str) -> Arranged>> { + self.map(|k| (k, ())) + .arrange_named::,KeyBuilder<_,_,_>,_>(name) + } + } + + } /// Conversion to a differential dataflow Collection. diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index d40ccd4bf..e1c2fde5f 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -185,7 +185,6 @@ impl TraceAgent { /// ``` /// use timely::Config; /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::arrange::ArrangeBySelf; /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine}; /// @@ -237,7 +236,6 @@ impl TraceAgent { /// use timely::dataflow::ProbeHandle; /// use timely::dataflow::operators::Probe; /// use differential_dataflow::input::InputSession; - /// use differential_dataflow::operators::arrange::ArrangeBySelf; /// use differential_dataflow::trace::Trace; /// /// ::timely::execute(Config::thread(), |worker| { @@ -341,7 +339,6 @@ impl TraceAgent { /// use timely::dataflow::operators::Probe; /// use timely::dataflow::operators::Inspect; /// use differential_dataflow::input::InputSession; - /// use differential_dataflow::operators::arrange::ArrangeBySelf; /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::TraceReader; /// use differential_dataflow::input::Input; diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 26dbb1e07..34f494ec0 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -21,16 +21,15 @@ use timely::dataflow::operators::{Enter, Map}; use timely::order::PartialOrder; use timely::dataflow::{Scope, Stream, StreamCore}; use timely::dataflow::operators::generic::Operator; -use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange}; +use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline}; use timely::progress::Timestamp; use timely::progress::Antichain; use timely::dataflow::operators::Capability; -use crate::{Data, ExchangeData, VecCollection, AsCollection, Hashable}; +use crate::{Data, VecCollection, AsCollection}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; -use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine}; use crate::trace::implementations::merge_batcher::container::MergerChunk; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; @@ -328,24 +327,6 @@ where ; } -impl Arrange> for VecCollection -where - G: Scope, - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData + Semigroup, -{ - fn arrange_named(&self, name: &str) -> Arranged> - where - Ba: Batcher, Time=G::Timestamp> + 'static, - Bu: Builder, - Tr: Trace + 'static, - { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name) - } -} - /// Arranges a stream of updates by a key, configured with a name and a parallelization contract. /// /// This operator arranges a stream of values into a shared trace, whose contents it maintains. @@ -504,86 +485,3 @@ where Arranged { stream, trace: reader.unwrap() } } - -impl Arrange> for VecCollection -where - G: Scope, -{ - fn arrange_named(&self, name: &str) -> Arranged> - where - Ba: Batcher, Time=G::Timestamp> + 'static, - Bu: Builder, - Tr: Trace + 'static, - { - let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) - } -} - -/// Arranges something as `(Key,Val)` pairs according to a type `T` of trace. -/// -/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed -/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the -/// pair `(u64, K)` of hash value and key. -pub trait ArrangeByKey -where - G: Scope, -{ - /// Arranges a collection of `(Key, Val)` records by `Key`. - /// - /// This operator arranges a stream of values into a shared trace, whose contents it maintains. - /// This trace is current for all times completed by the output stream, which can be used to - /// safely identify the stable times and values in the trace. - fn arrange_by_key(&self) -> Arranged>>; - - /// As `arrange_by_key` but with the ability to name the arrangement. - fn arrange_by_key_named(&self, name: &str) -> Arranged>>; -} - -impl ArrangeByKey for VecCollection -where - G: Scope, -{ - fn arrange_by_key(&self) -> Arranged>> { - self.arrange_by_key_named("ArrangeByKey") - } - - fn arrange_by_key_named(&self, name: &str) -> Arranged>> { - self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) - } -} - -/// Arranges something as `(Key, ())` pairs according to a type `T` of trace. -/// -/// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed -/// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the -/// pair `(u64, K)` of hash value and key. -pub trait ArrangeBySelf -where - G: Scope, -{ - /// Arranges a collection of `Key` records by `Key`. - /// - /// This operator arranges a collection of records into a shared trace, whose contents it maintains. - /// This trace is current for all times complete in the output stream, which can be used to safely - /// identify the stable times and values in the trace. - fn arrange_by_self(&self) -> Arranged>>; - - /// As `arrange_by_self` but with the ability to name the arrangement. - fn arrange_by_self_named(&self, name: &str) -> Arranged>>; -} - - -impl ArrangeBySelf for VecCollection -where - G: Scope, -{ - fn arrange_by_self(&self) -> Arranged>> { - self.arrange_by_self_named("ArrangeBySelf") - } - - fn arrange_by_self_named(&self, name: &str) -> Arranged>> { - self.map(|k| (k, ())) - .arrange_named::,KeyBuilder<_,_,_>,_>(name) - } -} diff --git a/differential-dataflow/src/operators/arrange/mod.rs b/differential-dataflow/src/operators/arrange/mod.rs index 2b7f4ba9a..70f9fb866 100644 --- a/differential-dataflow/src/operators/arrange/mod.rs +++ b/differential-dataflow/src/operators/arrange/mod.rs @@ -69,4 +69,4 @@ pub mod upsert; pub use self::writer::TraceWriter; pub use self::agent::{TraceAgent, ShutdownButton}; -pub use self::arrangement::{Arranged, Arrange, ArrangeByKey, ArrangeBySelf}; \ No newline at end of file +pub use self::arrangement::{Arranged, Arrange}; \ No newline at end of file diff --git a/differential-dataflow/src/operators/consolidate.rs b/differential-dataflow/src/operators/consolidate.rs deleted file mode 100644 index ef7adef89..000000000 --- a/differential-dataflow/src/operators/consolidate.rs +++ /dev/null @@ -1,106 +0,0 @@ -//! Aggregates the weights of equal records into at most one record. -//! -//! As differential dataflow streams are unordered and taken to be the accumulation of all records, -//! no semantic change happens via `consolidate`. However, there is a practical difference between -//! a collection that aggregates down to zero records, and one that actually has no records. The -//! underlying system can more clearly see that no work must be done in the later case, and we can -//! drop out of, e.g. iterative computations. - -use timely::dataflow::Scope; - -use crate::{VecCollection, ExchangeData, Hashable}; -use crate::consolidation::ConsolidatingContainerBuilder; -use crate::difference::Semigroup; - -use crate::Data; -use crate::lattice::Lattice; -use crate::trace::{Batcher, Builder}; - -/// Methods which require data be arrangeable. -impl VecCollection -where - G: Scope, - D: ExchangeData+Hashable, - R: Semigroup+ExchangeData, -{ - /// Aggregates the weights of equal records into at most one record. - /// - /// This method uses the type `D`'s `hashed()` method to partition the data. The data are - /// accumulated in place, each held back until their timestamp has completed. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(1 .. 10u32).1; - /// - /// x.negate() - /// .concat(&x) - /// .consolidate() // <-- ensures cancellation occurs - /// .assert_empty(); - /// }); - /// ``` - pub fn consolidate(&self) -> Self { - use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; - self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone()) - } - - /// As `consolidate` but with the ability to name the operator, specify the trace type, - /// and provide the function `reify` to produce owned keys and values.. - pub fn consolidate_named(&self, name: &str, reify: F) -> Self - where - Ba: Batcher, Time=G::Timestamp> + 'static, - Tr: for<'a> crate::trace::Trace+'static, - Bu: Builder, - F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, - { - use crate::operators::arrange::arrangement::Arrange; - self.map(|k| (k, ())) - .arrange_named::(name) - .as_collection(reify) - } - - /// Aggregates the weights of equal records. - /// - /// Unlike `consolidate`, this method does not exchange data and does not - /// ensure that at most one copy of each `(data, time)` pair exists in the - /// results. Instead, it acts on each batch of data and collapses equivalent - /// `(data, time)` pairs found therein, suppressing any that accumulate to - /// zero. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(1 .. 10u32).1; - /// - /// // nothing to assert, as no particular guarantees. - /// x.negate() - /// .concat(&x) - /// .consolidate_stream(); - /// }); - /// ``` - pub fn consolidate_stream(&self) -> Self { - - use timely::dataflow::channels::pact::Pipeline; - use timely::dataflow::operators::Operator; - use crate::collection::AsCollection; - - self.inner - .unary::, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| { - - move |input, output| { - input.for_each(|time, data| { - output.session_with_builder(&time).give_iterator(data.drain(..)); - }) - } - }) - .as_collection() - } -} diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index 2fcf42050..c7455bc3e 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -10,7 +10,7 @@ use crate::{ExchangeData, VecCollection}; use crate::difference::{IsZero, Semigroup}; use crate::hashable::Hashable; use crate::collection::AsCollection; -use crate::operators::arrange::{Arranged, ArrangeBySelf}; +use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `count` differential dataflow method. diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index a07098d0e..3b01174a5 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -18,7 +18,7 @@ use crate::hashable::Hashable; use crate::{Data, ExchangeData, VecCollection}; use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; -use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf}; +use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor}; use crate::operators::ValueHistory; @@ -209,7 +209,6 @@ pub trait JoinCore, K: 'static + ?Sized, V: 'st /// /// ``` /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::arrange::ArrangeByKey; /// use differential_dataflow::operators::join::JoinCore; /// use differential_dataflow::trace::Trace; /// @@ -249,7 +248,6 @@ pub trait JoinCore, K: 'static + ?Sized, V: 'st /// /// ``` /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::arrange::ArrangeByKey; /// use differential_dataflow::operators::join::JoinCore; /// use differential_dataflow::trace::Trace; /// diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 4ceba6754..0f21555ae 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -11,7 +11,6 @@ pub use self::threshold::ThresholdTotal; pub mod arrange; pub mod reduce; -pub mod consolidate; pub mod iterate; pub mod join; pub mod count; diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index 91a4039b7..39d5b9041 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -13,7 +13,7 @@ use crate::{ExchangeData, VecCollection}; use crate::difference::{Semigroup, Abelian}; use crate::hashable::Hashable; use crate::collection::AsCollection; -use crate::operators::arrange::{Arranged, ArrangeBySelf}; +use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `distinct` differential dataflow method. diff --git a/differential-dataflow/tests/import.rs b/differential-dataflow/tests/import.rs index f6d1306cd..fee3d0696 100644 --- a/differential-dataflow/tests/import.rs +++ b/differential-dataflow/tests/import.rs @@ -4,7 +4,6 @@ use timely::progress::frontier::AntichainRef; use differential_dataflow::input::InputSession; use differential_dataflow::collection::AsCollection; -use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; use differential_dataflow::trace::TraceReader; use itertools::Itertools; diff --git a/dogsdogsdogs/examples/delta_query.rs b/dogsdogsdogs/examples/delta_query.rs index 59df54f23..4d00c5df2 100644 --- a/dogsdogsdogs/examples/delta_query.rs +++ b/dogsdogsdogs/examples/delta_query.rs @@ -29,13 +29,11 @@ fn main() { let (edges_input, edges) = scope.new_collection(); // Graph oriented both ways, indexed by key. - use differential_dataflow::operators::arrange::ArrangeByKey; let forward_key = edges.arrange_by_key(); let reverse_key = edges.map(|(x,y)| (y,x)) .arrange_by_key(); // Graph oriented both ways, indexed by (key, val). - use differential_dataflow::operators::arrange::ArrangeBySelf; let forward_self = edges.arrange_by_self(); let reverse_self = edges.map(|(x,y)| (y,x)) .arrange_by_self(); diff --git a/dogsdogsdogs/examples/delta_query2.rs b/dogsdogsdogs/examples/delta_query2.rs index 4b1619b5e..0585f58b8 100644 --- a/dogsdogsdogs/examples/delta_query2.rs +++ b/dogsdogsdogs/examples/delta_query2.rs @@ -25,7 +25,6 @@ fn main() { let edges2 = data2.as_collection(); // Graph oriented both ways, indexed by key. - use differential_dataflow::operators::arrange::ArrangeByKey; let forward1 = edges1.arrange_by_key(); let forward2 = edges2.arrange_by_key(); diff --git a/dogsdogsdogs/examples/ngo.rs b/dogsdogsdogs/examples/ngo.rs index d5291b678..d3bea651e 100644 --- a/dogsdogsdogs/examples/ngo.rs +++ b/dogsdogsdogs/examples/ngo.rs @@ -6,9 +6,6 @@ use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::ArrangeBySelf; -use differential_dataflow::operators::arrange::ArrangeByKey; - use graph_map::GraphMMap; type Node = u32; diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index 7d9f577ca..7b63ad202 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -9,7 +9,6 @@ use differential_dataflow::{ExchangeData, VecCollection, AsCollection}; use differential_dataflow::difference::{Monoid, Multiply}; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::TraceAgent; -use differential_dataflow::operators::arrange::{ArrangeBySelf, ArrangeByKey}; pub mod altneu; pub mod calculus; diff --git a/doop/src/main.rs b/doop/src/main.rs index 1f7e16983..00d526e95 100644 --- a/doop/src/main.rs +++ b/doop/src/main.rs @@ -13,7 +13,6 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::input::Input; use differential_dataflow::operators::iterate::VecVariable; use differential_dataflow::operators::{Join, JoinCore}; -use differential_dataflow::operators::arrange::ArrangeByKey; // Type aliases for differential execution. type Time = u32; diff --git a/experiments/src/bin/arrange.rs b/experiments/src/bin/arrange.rs index e864d2854..cf3f2bbc7 100644 --- a/experiments/src/bin/arrange.rs +++ b/experiments/src/bin/arrange.rs @@ -5,7 +5,6 @@ use timely::dataflow::operators::{Exchange, Probe}; // use timely::progress::timestamp::RootTimestamp; use differential_dataflow::input::Input; -use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::operators::count::CountTotal; use differential_dataflow::operators::threshold::ThresholdTotal; diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index da053de68..585928892 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -7,7 +7,6 @@ use timely::WorkerConfig; use differential_dataflow::input::Input; use differential_dataflow::VecCollection; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::implementations::ValSpine; diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 999c64e18..3a566b384 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -11,8 +11,6 @@ use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::iterate::Variable; -use differential_dataflow::operators::arrange::ArrangeByKey; -use differential_dataflow::operators::arrange::ArrangeBySelf; type Node = usize; type Iter = usize; diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 39d009413..7314eee4a 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -7,9 +7,6 @@ use differential_dataflow::input::Input; use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; -// use differential_dataflow::operators::iterate::Variable; -use differential_dataflow::operators::arrange::ArrangeByKey; -use differential_dataflow::operators::arrange::ArrangeBySelf; type Node = usize; diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index dc28a6c51..be6ee4b14 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -11,8 +11,6 @@ use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::iterate::Variable; -use differential_dataflow::operators::arrange::ArrangeByKey; -use differential_dataflow::operators::arrange::ArrangeBySelf; type Node = usize; type Iter = usize; diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index c6d0ad36d..f38ff5932 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -9,8 +9,6 @@ use differential_dataflow::VecCollection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::iterate::Variable; -use differential_dataflow::operators::arrange::ArrangeByKey; -use differential_dataflow::operators::arrange::ArrangeBySelf; type Node = usize; diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index 1f764e81a..4378688e9 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -6,8 +6,6 @@ use timely::dataflow::operators::ToStream; use differential_dataflow::input::Input; use differential_dataflow::VecCollection; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::ArrangeByKey; -use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::operators::iterate::SemigroupVariable; use differential_dataflow::AsCollection; diff --git a/experiments/src/bin/graphs.rs b/experiments/src/bin/graphs.rs index be4676628..b1c5a816d 100644 --- a/experiments/src/bin/graphs.rs +++ b/experiments/src/bin/graphs.rs @@ -5,8 +5,6 @@ use timely::dataflow::*; use differential_dataflow::input::Input; use differential_dataflow::VecCollection; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::ArrangeByKey; -use differential_dataflow::operators::arrange::ArrangeBySelf; type Node = usize; diff --git a/experiments/src/bin/graspan-interactive.rs b/experiments/src/bin/graspan-interactive.rs index 92d5d04d5..1ea5002a0 100644 --- a/experiments/src/bin/graspan-interactive.rs +++ b/experiments/src/bin/graspan-interactive.rs @@ -1,13 +1,8 @@ use std::io::{BufRead, BufReader}; use std::fs::File; -// use timely::progress::nested::product::Product; -// use timely::dataflow::operators::{Accumulate, Inspect}; use differential_dataflow::input::Input; -// use differential_dataflow::trace::Trace; -// use differential_dataflow::trace::implementations::ord::OrdValSpine; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { diff --git a/interactive/src/command.rs b/interactive/src/command.rs index d795ec13a..5d110ea70 100644 --- a/interactive/src/command.rs +++ b/interactive/src/command.rs @@ -70,7 +70,6 @@ where worker.dataflow(|scope| { use timely::dataflow::operators::Probe; - use differential_dataflow::operators::arrange::ArrangeBySelf; use crate::plan::Render; let mut collections = std::collections::HashMap::new(); @@ -102,7 +101,6 @@ where Command::CreateInput(name, updates) => { use differential_dataflow::input::Input; - use differential_dataflow::operators::arrange::ArrangeBySelf; let (input, trace) = worker.dataflow(|scope| { let (input, collection) = scope.new_collection_from(updates.into_iter()); diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index 0e0ece76f..81266d95a 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -159,7 +159,6 @@ where // }); use differential_dataflow::collection::AsCollection; - use differential_dataflow::operators::arrange::ArrangeBySelf; let operates = operates.as_collection().arrange_by_self().trace; let channels = channels.as_collection().arrange_by_self().trace; let schedule = schedule.as_collection().arrange_by_self().trace; @@ -274,7 +273,6 @@ where }); use differential_dataflow::collection::AsCollection; - use differential_dataflow::operators::arrange::ArrangeBySelf; let batch = batch.as_collection().arrange_by_self().trace; let merge = merge.as_collection().arrange_by_self().trace; diff --git a/interactive/src/plan/join.rs b/interactive/src/plan/join.rs index 61696b43e..5c6596249 100644 --- a/interactive/src/plan/join.rs +++ b/interactive/src/plan/join.rs @@ -33,8 +33,6 @@ impl Render for Join { arrangements: &mut TraceManager, ) -> VecCollection, Diff> { - use differential_dataflow::operators::arrange::ArrangeByKey; - // acquire arrangements for each input. let keys1 = self.keys.iter().map(|key| key.0).collect::>(); let mut trace1 = diff --git a/interactive/src/plan/mod.rs b/interactive/src/plan/mod.rs index e0ae29f6c..deff18a4a 100644 --- a/interactive/src/plan/mod.rs +++ b/interactive/src/plan/mod.rs @@ -157,7 +157,6 @@ impl Render for Plan { Plan::Map(expressions) => expressions.render(scope, collections, arrangements), Plan::Distinct(distinct) => { - use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::trace::implementations::{KeyBuilder, KeySpine}; let input = diff --git a/interactive/src/plan/sfw.rs b/interactive/src/plan/sfw.rs index feca49b1a..7a1d51bb1 100644 --- a/interactive/src/plan/sfw.rs +++ b/interactive/src/plan/sfw.rs @@ -29,8 +29,6 @@ use serde::{Deserialize, Serialize}; use timely::dataflow::Scope; -use differential_dataflow::operators::arrange::{ArrangeBySelf, ArrangeByKey}; - use differential_dataflow::{VecCollection, ExchangeData}; use crate::plan::{Plan, Render}; use crate::{TraceManager, Time, Diff, Datum}; diff --git a/mdbook/src/chapter_5/chapter_5_1.md b/mdbook/src/chapter_5/chapter_5_1.md index 9afb06566..bf78ba68b 100644 --- a/mdbook/src/chapter_5/chapter_5_1.md +++ b/mdbook/src/chapter_5/chapter_5_1.md @@ -46,7 +46,6 @@ extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::JoinCore; -use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { diff --git a/mdbook/src/chapter_5/chapter_5_2.md b/mdbook/src/chapter_5/chapter_5_2.md index 5704d399e..4cdff11b6 100644 --- a/mdbook/src/chapter_5/chapter_5_2.md +++ b/mdbook/src/chapter_5/chapter_5_2.md @@ -11,7 +11,6 @@ extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::JoinCore; -use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { @@ -53,8 +52,6 @@ extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::JoinCore; -use differential_dataflow::operators::arrange::ArrangeByKey; -use differential_dataflow::operators::arrange::ArrangeBySelf; fn main() { @@ -108,7 +105,6 @@ extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::{Join, JoinCore}; -use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { diff --git a/mdbook/src/chapter_5/chapter_5_3.md b/mdbook/src/chapter_5/chapter_5_3.md index a3d6f4587..5f1cdc5ef 100644 --- a/mdbook/src/chapter_5/chapter_5_3.md +++ b/mdbook/src/chapter_5/chapter_5_3.md @@ -11,7 +11,6 @@ extern crate timely; extern crate differential_dataflow; use differential_dataflow::operators::JoinCore; -use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md index 15ca64228..d89856c61 100644 --- a/mdbook/src/chapter_5/chapter_5_4.md +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -24,7 +24,6 @@ extern crate differential_dataflow; use differential_dataflow::operators::Join; use differential_dataflow::operators::Iterate; -use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { diff --git a/server/dataflows/random_graph/src/lib.rs b/server/dataflows/random_graph/src/lib.rs index 1ea57f7da..cb527b25d 100644 --- a/server/dataflows/random_graph/src/lib.rs +++ b/server/dataflows/random_graph/src/lib.rs @@ -9,7 +9,6 @@ use timely::dataflow::operators::generic::operator::source; use timely::progress::Antichain; use differential_dataflow::AsCollection; -use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::trace::TraceReader; use dd_server::{Environment, TraceHandle}; diff --git a/server/dataflows/reachability/src/lib.rs b/server/dataflows/reachability/src/lib.rs index 223363bcd..2c6bfb10c 100644 --- a/server/dataflows/reachability/src/lib.rs +++ b/server/dataflows/reachability/src/lib.rs @@ -3,7 +3,6 @@ use std::cell::RefCell; use differential_dataflow::input::Input; use differential_dataflow::operators::Iterate; -use differential_dataflow::operators::arrange::ArrangeBySelf; use dd_server::{Environment, TraceHandle}; diff --git a/tpchlike/src/lib.rs b/tpchlike/src/lib.rs index 197b41997..c03599b1d 100644 --- a/tpchlike/src/lib.rs +++ b/tpchlike/src/lib.rs @@ -11,7 +11,6 @@ use timely::dataflow::*; use timely::dataflow::operators::CapabilitySet; use differential_dataflow::VecCollection; -use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::arrange::ShutdownButton; pub mod types; diff --git a/tpchlike/src/queries/query10.rs b/tpchlike/src/queries/query10.rs index 6bfe90035..647cb3e9e 100644 --- a/tpchlike/src/queries/query10.rs +++ b/tpchlike/src/queries/query10.rs @@ -97,8 +97,6 @@ where { let arrangements = arrangements.in_scope(scope, experiment); - use differential_dataflow::operators::arrange::ArrangeBySelf; - experiment .lineitem(scope) .explode(|x| diff --git a/tpchlike/src/queries/query12.rs b/tpchlike/src/queries/query12.rs index 360d59b12..efaf74b2b 100644 --- a/tpchlike/src/queries/query12.rs +++ b/tpchlike/src/queries/query12.rs @@ -3,7 +3,6 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle as ProbeHandle; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::{ArrangeBySelf, ArrangeByKey}; use differential_dataflow::difference::DiffPair; use differential_dataflow::lattice::Lattice; diff --git a/tpchlike/src/queries/query14.rs b/tpchlike/src/queries/query14.rs index f7750a746..0a8c550e1 100644 --- a/tpchlike/src/queries/query14.rs +++ b/tpchlike/src/queries/query14.rs @@ -3,7 +3,6 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle as ProbeHandle; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::difference::DiffPair; use differential_dataflow::lattice::Lattice; diff --git a/tpchlike/src/queries/query19.rs b/tpchlike/src/queries/query19.rs index 3cc000b9a..755559c82 100644 --- a/tpchlike/src/queries/query19.rs +++ b/tpchlike/src/queries/query19.rs @@ -3,7 +3,6 @@ use timely::dataflow::*; use timely::dataflow::operators::probe::Handle as ProbeHandle; use differential_dataflow::operators::*; -use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::lattice::Lattice; use {Arrangements, Experiment, Collections};