Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion differential-dataflow/examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use timely::scheduling::Scheduler;

use differential_dataflow::input::Input;
use differential_dataflow::AsCollection;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::Iterate;

Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use timely::progress::frontier::AntichainRef;
use timely::dataflow::operators::Probe;

use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::trace::cursor::Cursor;
use differential_dataflow::trace::TraceReader;

Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use timely::dataflow::scopes::ScopeParent;
use differential_dataflow::VecCollection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
use differential_dataflow::operators::iterate::VecVariable;

type Node = usize;
Expand Down
3 changes: 0 additions & 3 deletions differential-dataflow/examples/interpreted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use differential_dataflow::VecCollection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::*;

use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::arrange::ArrangeByKey;

use graph_map::GraphMMap;

type Node = u32;
Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/examples/itembased_cf.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::{Join,CountTotal};
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::join::JoinCore;

use rand::{Rng, SeedableRng, StdRng};
Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/examples/multitemporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use timely::progress::frontier::AntichainRef;
use timely::PartialOrder;

use differential_dataflow::AsCollection;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::trace::{Cursor, TraceReader};

use pair::Pair;
Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ where
G: Scope<Timestamp: Lattice+Ord>,
N: ExchangeData+Hash,
{
use crate::operators::arrange::arrangement::ArrangeByKey;
let edges = edges.arrange_by_key();
bfs_arranged(&edges, roots)
}
Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ where
G: Scope<Timestamp: Lattice+Ord>,
N: ExchangeData+Hash,
{
use crate::operators::arrange::arrangement::ArrangeByKey;
let forward = edges.arrange_by_key();
let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key();
bidijkstra_arranged(&forward, &reverse, goals)
Expand Down
1 change: 0 additions & 1 deletion differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use timely::dataflow::*;
use crate::{VecCollection, ExchangeData};
use crate::lattice::Lattice;
use crate::difference::{Abelian, Multiply};
use crate::operators::arrange::arrangement::ArrangeByKey;

/// Propagates labels forward, retaining the minimum label.
///
Expand Down
172 changes: 168 additions & 4 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,6 @@ pub mod vec {
/// As `reduce` with the ability to name the operator.
pub fn reduce_named<L, V2: crate::Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
use crate::operators::arrange::arrangement::ArrangeByKey;
use crate::trace::implementations::{ValBuilder, ValSpine};

self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand Down Expand Up @@ -803,7 +802,6 @@ pub mod vec {
Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::arrange::arrangement::ArrangeByKey;
self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_core::<_,Bu,_>(name, logic)
}
Expand Down Expand Up @@ -867,7 +865,6 @@ pub mod vec {

/// A `threshold` with the ability to name the operator.
pub fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
use crate::operators::arrange::arrangement::ArrangeBySelf;
use crate::trace::implementations::{KeyBuilder, KeySpine};

self.arrange_by_self_named(&format!("Arrange: {}", name))
Expand Down Expand Up @@ -906,14 +903,181 @@ pub mod vec {
/// type is something other than an `isize` integer, for example perhaps an
/// `i32`.
pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
use crate::operators::arrange::arrangement::ArrangeBySelf;
use crate::trace::implementations::{ValBuilder, ValSpine};
self.arrange_by_self_named("Arrange: Count")
.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
.as_collection(|k,c| (k.clone(), c.clone()))
}
}

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
where
G: Scope<Timestamp: Data+Lattice>,
D: crate::ExchangeData+Hashable,
R: crate::ExchangeData+Semigroup,
{
/// Aggregates the weights of equal records into at most one record.
///
/// This method uses the type `D`'s `hashed()` method to partition the data. The data are
/// accumulated in place, each held back until their timestamp has completed.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(1 .. 10u32).1;
///
/// x.negate()
/// .concat(&x)
/// .consolidate() // <-- ensures cancellation occurs
/// .assert_empty();
/// });
/// ```
pub fn consolidate(&self) -> Self {
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
}

/// As `consolidate` but with the ability to name the operator, specify the trace type,
/// and provide the function `reify` to produce owned keys and values..
pub fn consolidate_named<Ba, Bu, Tr, F>(&self, name: &str, reify: F) -> Self
where
Ba: crate::trace::Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Tr: for<'a> crate::trace::Trace<Time=G::Timestamp,Diff=R>+'static,
Bu: crate::trace::Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
{
use crate::operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
.arrange_named::<Ba, Bu, Tr>(name)
.as_collection(reify)
}

/// Aggregates the weights of equal records.
///
/// Unlike `consolidate`, this method does not exchange data and does not
/// ensure that at most one copy of each `(data, time)` pair exists in the
/// results. Instead, it acts on each batch of data and collapses equivalent
/// `(data, time)` pairs found therein, suppressing any that accumulate to
/// zero.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(1 .. 10u32).1;
///
/// // nothing to assert, as no particular guarantees.
/// x.negate()
/// .concat(&x)
/// .consolidate_stream();
/// });
/// ```
pub fn consolidate_stream(&self) -> Self {

use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
use crate::collection::AsCollection;
use crate::consolidation::ConsolidatingContainerBuilder;

self.inner
.unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {

move |input, output| {
input.for_each(|time, data| {
output.session_with_builder(&time).give_iterator(data.drain(..));
})
}
})
.as_collection()
}
}

use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder};
use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder};
use crate::operators::arrange::Arrange;

impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
where
G: Scope<Timestamp: Lattice>,
K: crate::ExchangeData + Hashable,
V: crate::ExchangeData,
R: crate::ExchangeData + Semigroup,
{
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: crate::trace::Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
Bu: crate::trace::Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: crate::trace::Trace<Time=G::Timestamp> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
}
}

impl<G, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
where
G: Scope<Timestamp: Lattice+Ord>,
{
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: crate::trace::Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Bu: crate::trace::Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: crate::trace::Trace<Time=G::Timestamp> + 'static,
{
let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
}
}


impl<G, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<G, (K,V), R>
where
G: Scope<Timestamp: Lattice+Ord>,
{
/// Arranges a collection of `(Key, Val)` records by `Key`.
///
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// This trace is current for all times completed by the output stream, which can be used to
/// safely identify the stable times and values in the trace.
pub fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
self.arrange_by_key_named("ArrangeByKey")
}

/// As `arrange_by_key` but with the ability to name the arrangement.
pub fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
}
}

impl<G, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<G, K, R>
where
G: Scope<Timestamp: Lattice+Ord>,
{
/// Arranges a collection of `Key` records by `Key`.
///
/// This operator arranges a collection of records into a shared trace, whose contents it maintains.
/// This trace is current for all times complete in the output stream, which can be used to safely
/// identify the stable times and values in the trace.
pub fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
self.arrange_by_self_named("ArrangeBySelf")
}

/// As `arrange_by_self` but with the ability to name the arrangement.
pub fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
self.map(|k| (k, ()))
.arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
}
}


}

/// Conversion to a differential dataflow Collection.
Expand Down
3 changes: 0 additions & 3 deletions differential-dataflow/src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ impl<Tr: TraceReader+'static> TraceAgent<Tr> {
/// ```
/// use timely::Config;
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::arrange::ArrangeBySelf;
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
///
Expand Down Expand Up @@ -237,7 +236,6 @@ impl<Tr: TraceReader+'static> TraceAgent<Tr> {
/// use timely::dataflow::ProbeHandle;
/// use timely::dataflow::operators::Probe;
/// use differential_dataflow::input::InputSession;
/// use differential_dataflow::operators::arrange::ArrangeBySelf;
/// use differential_dataflow::trace::Trace;
///
/// ::timely::execute(Config::thread(), |worker| {
Expand Down Expand Up @@ -341,7 +339,6 @@ impl<Tr: TraceReader+'static> TraceAgent<Tr> {
/// use timely::dataflow::operators::Probe;
/// use timely::dataflow::operators::Inspect;
/// use differential_dataflow::input::InputSession;
/// use differential_dataflow::operators::arrange::ArrangeBySelf;
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::TraceReader;
/// use differential_dataflow::input::Input;
Expand Down
Loading