diff --git a/differential-dataflow/examples/arrange.rs b/differential-dataflow/examples/arrange.rs index db7a876bc..6cbbad407 100644 --- a/differential-dataflow/examples/arrange.rs +++ b/differential-dataflow/examples/arrange.rs @@ -6,7 +6,6 @@ use timely::scheduling::Scheduler; use differential_dataflow::input::Input; use differential_dataflow::AsCollection; -use differential_dataflow::operators::join::JoinCore; use differential_dataflow::operators::Iterate; fn main() { diff --git a/differential-dataflow/examples/interpreted.rs b/differential-dataflow/examples/interpreted.rs index a0134d7e2..fdfa201c2 100644 --- a/differential-dataflow/examples/interpreted.rs +++ b/differential-dataflow/examples/interpreted.rs @@ -4,7 +4,6 @@ use timely::dataflow::operators::*; use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::*; use graph_map::GraphMMap; diff --git a/differential-dataflow/examples/itembased_cf.rs b/differential-dataflow/examples/itembased_cf.rs index 0dd05eb1b..4909b4bf3 100644 --- a/differential-dataflow/examples/itembased_cf.rs +++ b/differential-dataflow/examples/itembased_cf.rs @@ -1,6 +1,5 @@ use differential_dataflow::input::InputSession; -use differential_dataflow::operators::{Join,CountTotal}; -use differential_dataflow::operators::join::JoinCore; +use differential_dataflow::operators::CountTotal; use rand::{Rng, SeedableRng, StdRng}; diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index 786b9c4b9..76bf192ed 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -6,7 +6,6 @@ use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; use differential_dataflow::VecCollection; -use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; type Node = u32; diff --git a/differential-dataflow/examples/pagerank.rs b/differential-dataflow/examples/pagerank.rs index d72bc6712..b80f0b7da 100644 --- a/differential-dataflow/examples/pagerank.rs +++ b/differential-dataflow/examples/pagerank.rs @@ -3,7 +3,7 @@ use timely::dataflow::{*, operators::Filter}; use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::{*, iterate::Variable}; +use differential_dataflow::operators::iterate::Variable; use differential_dataflow::input::InputSession; use differential_dataflow::AsCollection; diff --git a/differential-dataflow/examples/projekt.rs b/differential-dataflow/examples/projekt.rs index f9929bc3a..e611f4773 100644 --- a/differential-dataflow/examples/projekt.rs +++ b/differential-dataflow/examples/projekt.rs @@ -1,7 +1,6 @@ use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::InputSession; -use differential_dataflow::operators::*; fn main() { diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index 387fdbe20..96425dad0 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -6,7 +6,6 @@ use timely::order::Product; use timely::dataflow::*; use crate::{VecCollection, ExchangeData}; -use crate::operators::*; use crate::lattice::Lattice; use crate::operators::iterate::Variable; diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 8b078eb14..6fc091cab 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -1077,7 +1077,163 @@ pub mod vec { } } + impl Collection + where + G: Scope, + K: crate::ExchangeData+Hashable, + V: crate::ExchangeData, + R: crate::ExchangeData+Semigroup, + { + /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`. + /// + /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1; + /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1; + /// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1; + /// + /// x.join(&y) + /// .assert_eq(&z); + /// }); + /// ``` + pub fn join(&self, other: &Collection) -> Collection>::Output> + where + K: crate::ExchangeData, + V2: crate::ExchangeData, + R2: crate::ExchangeData+Semigroup, + R: Multiply, + { + self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone()))) + } + + /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1; + /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1; + /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1; + /// + /// x.join_map(&y, |_key, &a, &b| (a,b)) + /// .assert_eq(&z); + /// }); + /// ``` + pub fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> + where R: Multiply, L: FnMut(&K, &V, &V2)->D+'static { + let arranged1 = self.arrange_by_key(); + let arranged2 = other.arrange_by_key(); + arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) + } + + /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied. + /// + /// When the second collection contains frequencies that are either zero or one this is the more traditional + /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up + /// the counts of the records in the first input. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1; + /// let y = scope.new_collection_from(vec![0, 2]).1; + /// let z = scope.new_collection_from(vec![(0, 1)]).1; + /// + /// x.semijoin(&y) + /// .assert_eq(&z); + /// }); + /// ``` + pub fn semijoin(&self, other: &Collection) -> Collection>::Output> + where R: Multiply { + let arranged1 = self.arrange_by_key(); + let arranged2 = other.arrange_by_self(); + arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) + } + /// Subtracts the semijoin with `other` from `self`. + /// + /// In the case that `other` has multiplicities zero or one this results + /// in a relational antijoin, in which we discard input records whose key + /// is present in `other`. If the multiplicities could be other than zero + /// or one, the semantic interpretation of this operator is less clear. + /// + /// In almost all cases, you should ensure that `other` has multiplicities + /// that are zero or one, perhaps by using the `distinct` operator. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1; + /// let y = scope.new_collection_from(vec![0, 2]).1; + /// let z = scope.new_collection_from(vec![(1, 3)]).1; + /// + /// x.antijoin(&y) + /// .assert_eq(&z); + /// }); + /// ``` + pub fn antijoin(&self, other: &Collection) -> Collection + where R: Multiply, R: Abelian+'static { + self.concat(&self.semijoin(other).negate()) + } + + /// Joins two arranged collections with the same key type. + /// + /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, + /// which produces something implementing `IntoIterator`, where the output collection will have an entry for + /// every value returned by the iterator. + /// + /// This trait is implemented for arrangements (`Arranged`) rather than collections. The `Join` trait + /// contains the implementations for collections. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// use differential_dataflow::trace::Trace; + /// + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1 + /// .arrange_by_key(); + /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1 + /// .arrange_by_key(); + /// + /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1; + /// + /// x.join_core(&y, |_key, &a, &b| Some((a, b))) + /// .assert_eq(&z); + /// }); + /// ``` + pub fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> + where + Tr2: for<'a> crate::trace::TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, + R: Multiply, + I: IntoIterator, + L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, + { + self.arrange_by_key() + .join_core(stream2, result) + } + } } /// Conversion to a differential dataflow Collection. diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 34f494ec0..8a1a1fbe3 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -142,6 +142,19 @@ where self.flat_map_ref(move |key, val| Some(logic(key,val))) } + /// Flattens the stream into a `Collection`. + /// + /// The underlying `Stream>` is a much more efficient way to access the data, + /// and this method should only be used when the data need to be transformed or exchanged, rather than + /// supplied as arguments to an operator using the same key-value structure. + pub fn as_vecs(&self) -> VecCollection + where + Tr::KeyOwn: crate::ExchangeData, + Tr::ValOwn: crate::ExchangeData, + { + self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))]) + } + /// Extracts elements from an arrangement as a collection. /// /// The supplied logic may produce an iterator over output values, allowing either @@ -198,7 +211,9 @@ where G: Scope, T1: TraceReader + Clone + 'static, { - /// A direct implementation of the `JoinCore::join_core` method. + /// A convenience method to join and produce `VecCollection` output. + /// + /// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion. pub fn join_core(&self, other: &Arranged, mut result: L) -> VecCollection>::Output> where T2: for<'a> TraceReader=T1::Key<'a>,Time=T1::Time>+Clone+'static, @@ -206,22 +221,12 @@ where I: IntoIterator, L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static { - let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| { + let mut result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| { let t = t.clone(); let r = (r1.clone()).multiply(r2); result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) }; - self.join_core_internal_unsafe(other, result) - } - /// A direct implementation of the `JoinCore::join_core_internal_unsafe` method. - pub fn join_core_internal_unsafe (&self, other: &Arranged, mut result: L) -> VecCollection - where - T2: for<'a> TraceReader=T1::Key<'a>, Time=T1::Time>+Clone+'static, - D: Data, - ROut: Semigroup+'static, - I: IntoIterator, - L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, - { + use crate::operators::join::join_traces; join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>( self, diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 3b01174a5..65d353d5c 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -14,9 +14,6 @@ use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Sessi use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; -use crate::hashable::Hashable; -use crate::{Data, ExchangeData, VecCollection}; -use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::operators::arrange::Arranged; use crate::trace::{BatchReader, Cursor}; @@ -24,288 +21,6 @@ use crate::operators::ValueHistory; use crate::trace::TraceReader; -/// Join implementations for `(key,val)` data. -pub trait Join { - - /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`. - /// - /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Join; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1; - /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1; - /// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1; - /// - /// x.join(&y) - /// .assert_eq(&z); - /// }); - /// ``` - fn join(&self, other: &VecCollection) -> VecCollection>::Output> - where - K: ExchangeData, - V2: ExchangeData, - R2: ExchangeData+Semigroup, - R: Multiply, - { - self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone()))) - } - - /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Join; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1; - /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1; - /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1; - /// - /// x.join_map(&y, |_key, &a, &b| (a,b)) - /// .assert_eq(&z); - /// }); - /// ``` - fn join_map(&self, other: &VecCollection, logic: L) -> VecCollection>::Output> - where K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply, D: Data, L: FnMut(&K, &V, &V2)->D+'static; - - /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied. - /// - /// When the second collection contains frequencies that are either zero or one this is the more traditional - /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up - /// the counts of the records in the first input. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Join; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1; - /// let y = scope.new_collection_from(vec![0, 2]).1; - /// let z = scope.new_collection_from(vec![(0, 1)]).1; - /// - /// x.semijoin(&y) - /// .assert_eq(&z); - /// }); - /// ``` - fn semijoin(&self, other: &VecCollection) -> VecCollection>::Output> - where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply; - - /// Subtracts the semijoin with `other` from `self`. - /// - /// In the case that `other` has multiplicities zero or one this results - /// in a relational antijoin, in which we discard input records whose key - /// is present in `other`. If the multiplicities could be other than zero - /// or one, the semantic interpretation of this operator is less clear. - /// - /// In almost all cases, you should ensure that `other` has multiplicities - /// that are zero or one, perhaps by using the `distinct` operator. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Join; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1; - /// let y = scope.new_collection_from(vec![0, 2]).1; - /// let z = scope.new_collection_from(vec![(1, 3)]).1; - /// - /// x.antijoin(&y) - /// .assert_eq(&z); - /// }); - /// ``` - fn antijoin(&self, other: &VecCollection) -> VecCollection - where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply, R: Abelian+'static; -} - -impl Join for VecCollection -where - G: Scope, - K: ExchangeData+Hashable, - V: ExchangeData, - R: ExchangeData+Semigroup, -{ - fn join_map(&self, other: &VecCollection, mut logic: L) -> VecCollection>::Output> - where R: Multiply, L: FnMut(&K, &V, &V2)->D+'static { - let arranged1 = self.arrange_by_key(); - let arranged2 = other.arrange_by_key(); - arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) - } - - fn semijoin(&self, other: &VecCollection) -> VecCollection>::Output> - where R: Multiply { - let arranged1 = self.arrange_by_key(); - let arranged2 = other.arrange_by_self(); - arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) - } - - fn antijoin(&self, other: &VecCollection) -> VecCollection - where R: Multiply, R: Abelian+'static { - self.concat(&self.semijoin(other).negate()) - } -} - -impl Join for Arranged -where - G: Scope, - Tr: for<'a> TraceReader = &'a K, Val<'a> = &'a V>+Clone+'static, - K: ExchangeData+Hashable, - V: Data + 'static, -{ - fn join_map(&self, other: &VecCollection, mut logic: L) -> VecCollection>::Output> - where - Tr::Diff: Multiply, - L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static, - { - let arranged2 = other.arrange_by_key(); - self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) - } - - fn semijoin(&self, other: &VecCollection) -> VecCollection>::Output> - where Tr::Diff: Multiply { - let arranged2 = other.arrange_by_self(); - self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) - } - - fn antijoin(&self, other: &VecCollection) -> VecCollection - where Tr::Diff: Multiply, Tr::Diff: Abelian+'static { - self.as_collection(|k,v| (k.clone(), v.clone())) - .concat(&self.semijoin(other).negate()) - } -} - -/// Matches the elements of two arranged traces. -/// -/// This method is used by the various `join` implementations, but it can also be used -/// directly in the event that one has a handle to an `Arranged`, perhaps because -/// the arrangement is available for re-use, or from the output of a `reduce` operator. -pub trait JoinCore, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup> { - - /// Joins two arranged collections with the same key type. - /// - /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, - /// which produces something implementing `IntoIterator`, where the output collection will have an entry for - /// every value returned by the iterator. - /// - /// This trait is implemented for arrangements (`Arranged`) rather than collections. The `Join` trait - /// contains the implementations for collections. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::join::JoinCore; - /// use differential_dataflow::trace::Trace; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1 - /// .arrange_by_key(); - /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1 - /// .arrange_by_key(); - /// - /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1; - /// - /// x.join_core(&y, |_key, &a, &b| Some((a, b))) - /// .assert_eq(&z); - /// }); - /// ``` - fn join_core (&self, stream2: &Arranged, result: L) -> VecCollection>::Output> - where - Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, - R: Multiply, - I: IntoIterator, - L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, - ; - - /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and - /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more - /// flexibility, but is more error-prone. - /// - /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, - /// which produces something implementing `IntoIterator`, where the output collection will have an entry - /// for every value returned by the iterator. - /// - /// This trait is implemented for arrangements (`Arranged`) rather than collections. The `Join` trait - /// contains the implementations for collections. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::join::JoinCore; - /// use differential_dataflow::trace::Trace; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1 - /// .arrange_by_key(); - /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1 - /// .arrange_by_key(); - /// - /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1; - /// - /// // Returned values have weight `a` - /// x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a))) - /// .assert_eq(&z); - /// }); - /// ``` - fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> VecCollection - where - Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, - D: Data, - ROut: Semigroup+'static, - I: IntoIterator, - L: for<'a> FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static, - ; -} - - -impl JoinCore for VecCollection -where - G: Scope, - K: ExchangeData+Hashable, - V: ExchangeData, - R: ExchangeData+Semigroup, -{ - fn join_core (&self, stream2: &Arranged, result: L) -> VecCollection>::Output> - where - Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, - R: Multiply, - I: IntoIterator, - L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, - { - self.arrange_by_key() - .join_core(stream2, result) - } - - fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> VecCollection - where - Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, - I: IntoIterator, - L: FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static, - D: Data, - ROut: Semigroup+'static, - { - self.arrange_by_key().join_core_internal_unsafe(stream2, result) - } -} - /// The session passed to join closures. pub type JoinSession<'a, 'b, T, CB, CT> = Session<'a, 'b, T, EffortBuilder, CT>; diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 0f21555ae..e511603e3 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -5,7 +5,6 @@ //! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`). pub use self::iterate::Iterate; -pub use self::join::{Join, JoinCore}; pub use self::count::CountTotal; pub use self::threshold::ThresholdTotal; diff --git a/differential-dataflow/tests/join.rs b/differential-dataflow/tests/join.rs index 5e6b1f46c..7956518de 100644 --- a/differential-dataflow/tests/join.rs +++ b/differential-dataflow/tests/join.rs @@ -1,7 +1,6 @@ use timely::dataflow::operators::{ToStream, Capture, Map}; use timely::dataflow::operators::capture::Extract; use differential_dataflow::AsCollection; -use differential_dataflow::operators::Join; #[test] fn join() { diff --git a/dogsdogsdogs/examples/delta_query.rs b/dogsdogsdogs/examples/delta_query.rs index 4d00c5df2..9e197b5b9 100644 --- a/dogsdogsdogs/examples/delta_query.rs +++ b/dogsdogsdogs/examples/delta_query.rs @@ -1,7 +1,6 @@ use timely::dataflow::Scope; use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; -use differential_dataflow::operators::JoinCore; use graph_map::GraphMMap; use differential_dogs3::altneu::AltNeu; diff --git a/dogsdogsdogs/examples/ngo.rs b/dogsdogsdogs/examples/ngo.rs index d3bea651e..d0beb6651 100644 --- a/dogsdogsdogs/examples/ngo.rs +++ b/dogsdogsdogs/examples/ngo.rs @@ -4,7 +4,6 @@ use timely::dataflow::operators::*; use differential_dataflow::VecCollection; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::*; use graph_map::GraphMMap; diff --git a/doop/src/main.rs b/doop/src/main.rs index 00d526e95..50bc35044 100644 --- a/doop/src/main.rs +++ b/doop/src/main.rs @@ -12,7 +12,6 @@ use differential_dataflow::ExchangeData as Data; use differential_dataflow::lattice::Lattice; use differential_dataflow::input::Input; use differential_dataflow::operators::iterate::VecVariable; -use differential_dataflow::operators::{Join, JoinCore}; // Type aliases for differential execution. type Time = u32; @@ -421,6 +420,7 @@ fn main() { // LoadInstanceField_To(?insn, ?to). let LoadInstanceField: VecCollection<_,(Var, Field, Var, Method)> = Instruction_Method + .as_vecs() .join(&LoadInstanceField_Base) .join(&FieldInstruction_Signature) .join(&LoadInstanceField_To) @@ -433,6 +433,7 @@ fn main() { // FieldInstruction_Signature(?insn, ?sig). let StoreInstanceField: VecCollection<_,(Var, Var, Field, Method)> = Instruction_Method + .as_vecs() .join(&StoreInstanceField_From) .join(&StoreInstanceField_Base) .join(&FieldInstruction_Signature) @@ -444,6 +445,7 @@ fn main() { // LoadStaticField_To(?insn, ?to). let LoadStaticField: VecCollection<_,(Field, Var, Method)> = Instruction_Method + .as_vecs() .join(&FieldInstruction_Signature) .join(&LoadStaticField_To) .map(|(_insn, ((inmethod, sig), to))| (sig, to, inmethod)); @@ -454,6 +456,7 @@ fn main() { // FieldInstruction_Signature(?insn, ?sig). let StoreStaticField: VecCollection<_,(Var, Field, Method)> = Instruction_Method + .as_vecs() .join(&StoreStaticField_From) .join(&FieldInstruction_Signature) .map(|(_insn, ((inmethod, from), sig))| (from, sig, inmethod)); @@ -464,6 +467,7 @@ fn main() { // LoadArrayIndex_To(?insn, ?to). let LoadArrayIndex: VecCollection<_,(Var, Var, Method)> = Instruction_Method + .as_vecs() .join(&LoadArrayIndex_Base) .join(&LoadArrayIndex_To) .map(|(_insn, ((inmethod, base), to))| (base, to, inmethod)); @@ -474,6 +478,7 @@ fn main() { // StoreArrayIndex_Base(?insn, ?base). let StoreArrayIndex: VecCollection<_,(Var, Var, Method)> = Instruction_Method + .as_vecs() .join(&StoreArrayIndex_From) .join(&StoreArrayIndex_Base) .map(|(_insn, ((inmethod, from), base))| (from, base, inmethod)); @@ -485,6 +490,7 @@ fn main() { // AssignCast_Type(?insn, ?type). let AssignCast: VecCollection<_,(Type, Var, Var, Method)> = Instruction_Method + .as_vecs() .join(&AssignCast_From) .join(&AssignInstruction_To) .join(&AssignCast_Type) @@ -496,6 +502,7 @@ fn main() { // AssignLocal_From(?insn, ?from). let AssignLocal: VecCollection<_,(Var, Var, Method)> = Instruction_Method + .as_vecs() .join(&AssignInstruction_To) .join(&AssignLocal_From) .map(|(_insn, ((inmethod, to), from))| (from, to, inmethod)); @@ -506,6 +513,7 @@ fn main() { // AssignInstruction_To(?insn, ?to). let AssignHeapAllocation: VecCollection<_,(HeapAllocation, Var, Method)> = Instruction_Method + .as_vecs() .join(&AssignHeapAllocation_Heap) .join(&AssignInstruction_To) .map(|(_insn, ((inmethod, heap), to))| (heap, to, inmethod)); @@ -515,6 +523,7 @@ fn main() { // ReturnNonvoid_Var(?insn, ?var). let ReturnVar: VecCollection<_,(Var, Method)> = Instruction_Method + .as_vecs() .join(&ReturnNonvoid_Var) .map(|(_insn, (inmethod, var))| (var, inmethod)); @@ -524,7 +533,7 @@ fn main() { // MethodInvocation_Method(?invocation, ?signature). let StaticMethodInvocation: VecCollection<_,(MethodInvocation, Method, Method)> = Instruction_Method - .semijoin(&isStaticMethodInvocation_Insn) + .join_core(&isStaticMethodInvocation_Insn.arrange_by_self(), |k,v,_| [(k.clone(), v.clone())]) .join(&MethodInvocation_Method) .map(|(invocation, (inmethod, sig))| (invocation, sig, inmethod)); diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index 585928892..17b053003 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -218,7 +218,7 @@ where G::Timestamp: Lattice{ .iterate(|inner| edges .enter(&inner.scope()) - .join_map(&inner, |_,&y,&q| (y,q)) + .join_core(&inner.arrange_by_key(), |_,&y,&q| [(y,q)]) .concat(&tc_1.enter(&inner.scope()).map(|x| (x,x))) .distinct() ) @@ -231,7 +231,7 @@ where G::Timestamp: Lattice{ edges .as_collection(|&k,&v| (v,k)) .enter(&inner.scope()) - .join_map(&inner, |_,&y,&q| (y,q)) + .join_core(&inner.arrange_by_key(), |_,&y,&q| [(y,q)]) .concat(&tc_2.enter(&inner.scope()).map(|x| (x,x))) .distinct() ) @@ -255,7 +255,7 @@ where G::Timestamp: Lattice{ let magic_edges = edges - .semijoin(&magic) + .join_core(&magic.arrange_by_self(), |k,v,_| [(k.clone(), v.clone())]) .map(|(x,y)|(y,x)) .semijoin(&magic) .map(|(x,y)|(y,x)); diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 7314eee4a..9f365e5d6 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -5,7 +5,6 @@ use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; use differential_dataflow::VecCollection; -use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; type Node = usize; diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index be6ee4b14..1aed65bec 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -8,7 +8,6 @@ use timely::order::Product; use differential_dataflow::input::Input; use differential_dataflow::VecCollection; -use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::iterate::Variable; diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index f38ff5932..c7549fe96 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -6,7 +6,6 @@ use timely::order::Product; use differential_dataflow::input::Input; use differential_dataflow::VecCollection; -use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::iterate::Variable; diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index 4378688e9..7255b5a72 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -146,7 +146,7 @@ fn bfs> ( let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let result = - graph.join_map(&inner, |_src,&dest,&dist| (dest, dist+1)) + graph.join_core(&inner.arrange_by_key(), |_src,&dest,&dist| [(dest, dist+1)]) .concat(&roots) .reduce(|_key, input, output| output.push((*input[0].0,1))); diff --git a/experiments/src/bin/graphs.rs b/experiments/src/bin/graphs.rs index b1c5a816d..ee5ed5791 100644 --- a/experiments/src/bin/graphs.rs +++ b/experiments/src/bin/graphs.rs @@ -123,7 +123,7 @@ fn bfs> ( let graph = graph.enter(&inner.scope()); let roots = roots.enter(&inner.scope()); - graph.join_map(&inner, |_src,&dest,&dist| (dest, dist+1)) + graph.join_core(&inner.arrange_by_key(), |_src,&dest,&dist| [(dest, dist+1)]) .concat(&roots) .reduce(|_key, input, output| output.push((*input[0].0,1))) }) diff --git a/experiments/src/bin/ysb.rs b/experiments/src/bin/ysb.rs index 9fa14063b..39ccb1779 100644 --- a/experiments/src/bin/ysb.rs +++ b/experiments/src/bin/ysb.rs @@ -1,7 +1,6 @@ use rand::{Rng, SeedableRng, StdRng}; use differential_dataflow::input::Input; -use differential_dataflow::operators::Join; #[derive(Clone)] pub enum AdType { diff --git a/mdbook/src/chapter_0/chapter_0_1.md b/mdbook/src/chapter_0/chapter_0_1.md index e031fb7aa..eabafe36b 100644 --- a/mdbook/src/chapter_0/chapter_0_1.md +++ b/mdbook/src/chapter_0/chapter_0_1.md @@ -11,7 +11,6 @@ extern crate timely; extern crate differential_dataflow; use differential_dataflow::input::InputSession; -use differential_dataflow::operators::Join; fn main() { diff --git a/mdbook/src/chapter_2/chapter_2_1.md b/mdbook/src/chapter_2/chapter_2_1.md index 320210f21..3d40b7694 100644 --- a/mdbook/src/chapter_2/chapter_2_1.md +++ b/mdbook/src/chapter_2/chapter_2_1.md @@ -10,7 +10,6 @@ As an example, our example program used `map` to reverse the pairs of identifier # use timely::dataflow::Scope; # use differential_dataflow::VecCollection; # use differential_dataflow::lattice::Lattice; -# use differential_dataflow::operators::Join; # fn example(manages: &VecCollection) # where G::Timestamp: Lattice # { diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index 508c30398..965d5d380 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -9,7 +9,6 @@ Our example from earlier uses a join to match up pairs `(m2, m1)` and `(m1, p)` # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::VecCollection; -# use differential_dataflow::operators::Join; # use differential_dataflow::lattice::Lattice; # fn example(manages: &VecCollection) # where G::Timestamp: Lattice diff --git a/mdbook/src/chapter_2/chapter_2_7.md b/mdbook/src/chapter_2/chapter_2_7.md index 4c1b2902a..8f830db79 100644 --- a/mdbook/src/chapter_2/chapter_2_7.md +++ b/mdbook/src/chapter_2/chapter_2_7.md @@ -9,7 +9,7 @@ As an example, we can take our `manages` relation and determine for all employee # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::VecCollection; -# use differential_dataflow::operators::{Join, Iterate}; +# use differential_dataflow::operators::Iterate; # use differential_dataflow::lattice::Lattice; # fn example(manages: &VecCollection) # where G::Timestamp: Lattice @@ -45,7 +45,6 @@ In the example above, we could rewrite # extern crate differential_dataflow; # use timely::dataflow::Scope; # use differential_dataflow::VecCollection; -# use differential_dataflow::operators::Join; # use differential_dataflow::operators::{Iterate, iterate::VecVariable}; # use differential_dataflow::lattice::Lattice; # fn example(manages: &VecCollection) diff --git a/mdbook/src/chapter_5/chapter_5_1.md b/mdbook/src/chapter_5/chapter_5_1.md index bf78ba68b..52cbf0d41 100644 --- a/mdbook/src/chapter_5/chapter_5_1.md +++ b/mdbook/src/chapter_5/chapter_5_1.md @@ -8,8 +8,6 @@ Let's first build this naively, starting from two inputs: `knows` containing the extern crate timely; extern crate differential_dataflow; -use differential_dataflow::operators::Join; - fn main() { // define a new timely dataflow computation. @@ -45,8 +43,6 @@ To arrange a collection, we just call one of several `arrange` methods. In this extern crate timely; extern crate differential_dataflow; -use differential_dataflow::operators::JoinCore; - fn main() { // define a new timely dataflow computation. diff --git a/mdbook/src/chapter_5/chapter_5_2.md b/mdbook/src/chapter_5/chapter_5_2.md index 4cdff11b6..6dbcc0f46 100644 --- a/mdbook/src/chapter_5/chapter_5_2.md +++ b/mdbook/src/chapter_5/chapter_5_2.md @@ -10,8 +10,6 @@ We saw before an example where we used one type of arrangement, `arrange_by_key( extern crate timely; extern crate differential_dataflow; -use differential_dataflow::operators::JoinCore; - fn main() { // define a new timely dataflow computation. @@ -51,8 +49,6 @@ We can show off arrangement by self in our "friends of friends" example by addin extern crate timely; extern crate differential_dataflow; -use differential_dataflow::operators::JoinCore; - fn main() { // define a new timely dataflow computation. @@ -104,8 +100,6 @@ You may need to return from an arrangement to a collection (a stream of updates) extern crate timely; extern crate differential_dataflow; -use differential_dataflow::operators::{Join, JoinCore}; - fn main() { // define a new timely dataflow computation. diff --git a/mdbook/src/chapter_5/chapter_5_3.md b/mdbook/src/chapter_5/chapter_5_3.md index 5f1cdc5ef..1ba28aff1 100644 --- a/mdbook/src/chapter_5/chapter_5_3.md +++ b/mdbook/src/chapter_5/chapter_5_3.md @@ -10,8 +10,6 @@ The following example demonstrates going from an interactive input session (`inp extern crate timely; extern crate differential_dataflow; -use differential_dataflow::operators::JoinCore; - fn main() { // define a new timely dataflow computation. diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md index d89856c61..549312cad 100644 --- a/mdbook/src/chapter_5/chapter_5_4.md +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -22,7 +22,6 @@ The following example demonstrates arranging the `knows` relation outside an ite extern crate timely; extern crate differential_dataflow; -use differential_dataflow::operators::Join; use differential_dataflow::operators::Iterate; fn main() { @@ -47,7 +46,7 @@ fn main() { let knows = knows.enter(&reach.scope()); let query = query.enter(&reach.scope()); - knows.join_map(reach, |x,y,q| (*y,*q)) + knows.join_core(&reach.arrange_by_key(), |x,y,q| [(*y,*q)]) .concat(&query) .distinct() }); diff --git a/server/dataflows/neighborhood/src/lib.rs b/server/dataflows/neighborhood/src/lib.rs index 01d3b7f88..2796a11ec 100644 --- a/server/dataflows/neighborhood/src/lib.rs +++ b/server/dataflows/neighborhood/src/lib.rs @@ -2,7 +2,6 @@ use std::rc::Rc; use std::cell::RefCell; use differential_dataflow::input::Input; -use differential_dataflow::operators::JoinCore; use dd_server::{Environment, TraceHandle}; @@ -17,7 +16,7 @@ pub fn build((dataflow, handles, probe, _timer, args): Environment) -> Result<() .get_mut::>>>(&args[0])? .borrow_mut().as_mut().unwrap().import(dataflow); - let source = args[1].parse::().map_err(|_| format!("parse error, source: {:?}", args[1]))?; + let source = args[1].parse::().map_err(|_| format!("parse error, source: {:?}", args[1]))?; let (_input, query) = dataflow.new_collection_from(Some(source)); let timer = ::std::time::Instant::now();