Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion differential-dataflow/examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use timely::scheduling::Scheduler;

use differential_dataflow::input::Input;
use differential_dataflow::AsCollection;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::Iterate;

fn main() {
Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/examples/interpreted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use timely::dataflow::operators::*;

use differential_dataflow::VecCollection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::*;

use graph_map::GraphMMap;

Expand Down
3 changes: 1 addition & 2 deletions differential-dataflow/examples/itembased_cf.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::{Join,CountTotal};
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::CountTotal;

use rand::{Rng, SeedableRng, StdRng};

Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
use differential_dataflow::VecCollection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;

type Node = u32;
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use timely::dataflow::{*, operators::Filter};

use differential_dataflow::VecCollection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::{*, iterate::Variable};
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::input::InputSession;
use differential_dataflow::AsCollection;

Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/examples/projekt.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::InputSession;
use differential_dataflow::operators::*;

fn main() {

Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use timely::order::Product;
use timely::dataflow::*;

use crate::{VecCollection, ExchangeData};
use crate::operators::*;
use crate::lattice::Lattice;
use crate::operators::iterate::Variable;

Expand Down
156 changes: 156 additions & 0 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,163 @@ pub mod vec {
}
}

impl<G, K, V, R> Collection<G, (K, V), R>
where
G: Scope<Timestamp: Lattice+Ord>,
K: crate::ExchangeData+Hashable,
V: crate::ExchangeData,
R: crate::ExchangeData+Semigroup,
{
/// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
///
/// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
/// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
/// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
///
/// x.join(&y)
/// .assert_eq(&z);
/// });
/// ```
pub fn join<V2, R2>(&self, other: &Collection<G, (K,V2), R2>) -> Collection<G, (K,(V,V2)), <R as Multiply<R2>>::Output>
where
K: crate::ExchangeData,
V2: crate::ExchangeData,
R2: crate::ExchangeData+Semigroup,
R: Multiply<R2, Output: Semigroup+'static>,
{
self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
}

/// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
/// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
/// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
///
/// x.join_map(&y, |_key, &a, &b| (a,b))
/// .assert_eq(&z);
/// });
/// ```
pub fn join_map<V2: crate::ExchangeData, R2: crate::ExchangeData+Semigroup, D: crate::Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
let arranged1 = self.arrange_by_key();
let arranged2 = other.arrange_by_key();
arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
}

/// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
///
/// When the second collection contains frequencies that are either zero or one this is the more traditional
/// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
/// the counts of the records in the first input.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
/// let y = scope.new_collection_from(vec![0, 2]).1;
/// let z = scope.new_collection_from(vec![(0, 1)]).1;
///
/// x.semijoin(&y)
/// .assert_eq(&z);
/// });
/// ```
pub fn semijoin<R2: crate::ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
where R: Multiply<R2, Output: Semigroup+'static> {
let arranged1 = self.arrange_by_key();
let arranged2 = other.arrange_by_self();
arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
}

/// Subtracts the semijoin with `other` from `self`.
///
/// In the case that `other` has multiplicities zero or one this results
/// in a relational antijoin, in which we discard input records whose key
/// is present in `other`. If the multiplicities could be other than zero
/// or one, the semantic interpretation of this operator is less clear.
///
/// In almost all cases, you should ensure that `other` has multiplicities
/// that are zero or one, perhaps by using the `distinct` operator.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
/// let y = scope.new_collection_from(vec![0, 2]).1;
/// let z = scope.new_collection_from(vec![(1, 3)]).1;
///
/// x.antijoin(&y)
/// .assert_eq(&z);
/// });
/// ```
pub fn antijoin<R2: crate::ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), R>
where R: Multiply<R2, Output=R>, R: Abelian+'static {
self.concat(&self.semijoin(other).negate())
}

/// Joins two arranged collections with the same key type.
///
/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
/// every value returned by the iterator.
///
/// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
/// contains the implementations for collections.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
/// use differential_dataflow::trace::Trace;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
/// .arrange_by_key();
/// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
/// .arrange_by_key();
///
/// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
///
/// x.join_core(&y, |_key, &a, &b| Some((a, b)))
/// .assert_eq(&z);
/// });
/// ```
pub fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
where
Tr2: for<'a> crate::trace::TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
I: IntoIterator<Item: crate::Data>,
L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
{
self.arrange_by_key()
.join_core(stream2, result)
}
}
}

/// Conversion to a differential dataflow Collection.
Expand Down
31 changes: 18 additions & 13 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ where
self.flat_map_ref(move |key, val| Some(logic(key,val)))
}

/// Flattens the stream into a `Collection`.
///
/// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
/// supplied as arguments to an operator using the same key-value structure.
pub fn as_vecs(&self) -> VecCollection<G, (Tr::KeyOwn, Tr::ValOwn), Tr::Diff>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to mention I added this. We should use it all over the place instead of as_collection with logic that happens to do the same thing.

where
Tr::KeyOwn: crate::ExchangeData,
Tr::ValOwn: crate::ExchangeData,
{
self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
}

/// Extracts elements from an arrangement as a collection.
///
/// The supplied logic may produce an iterator over output values, allowing either
Expand Down Expand Up @@ -198,30 +211,22 @@ where
G: Scope<Timestamp=T1::Time>,
T1: TraceReader + Clone + 'static,
{
/// A direct implementation of the `JoinCore::join_core` method.
/// A convenience method to join and produce `VecCollection` output.
///
/// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> VecCollection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
where
T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
I: IntoIterator<Item: Data>,
L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
{
let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
let mut result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
self.join_core_internal_unsafe(other, result)
}
/// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> VecCollection<G,D,ROut>
where
T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
D: Data,
ROut: Semigroup+'static,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
{

use crate::operators::join::join_traces;
join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
self,
Expand Down
Loading