Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ jobs:
toolchain: ${{ matrix.toolchain }}
- name: Cargo test
run: cargo test --workspace --all-targets
- name: Cargo doc test
run: cargo test --doc

# Check formatting with rustfmt
mdbook:
Expand Down
84 changes: 84 additions & 0 deletions differential-dataflow/examples/iterate_container.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//! Show an iterative scope example that uses a wrapper type around a container

use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
use timely::order::Product;
use timely::dataflow::{Scope, StreamCore};
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use differential_dataflow::{AsCollection, Collection};
use differential_dataflow::input::Input;
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::collection::containers::{Enter, Leave, Negate, ResultsIn};

/// A wrapper around a container that implements the necessary traits to be used in iterative scopes.
#[derive(Clone, Default)]
struct ContainerWrapper<C>(C);

impl<C: timely::container::Accountable> timely::container::Accountable for ContainerWrapper<C> {
#[inline(always)] fn record_count(&self) -> i64 { self.0.record_count() }
#[inline(always)] fn is_empty(&self) -> bool { self.0.is_empty() }
}
impl<C: Enter<T1, T2>, T1, T2> Enter<T1, T2> for ContainerWrapper<C> {
type InnerContainer = ContainerWrapper<C::InnerContainer>;
#[inline(always)] fn enter(self) -> Self::InnerContainer { ContainerWrapper(self.0.enter()) }
}
impl<C: Leave<T1, T2>, T1, T2> Leave<T1, T2> for ContainerWrapper<C> {
type OuterContainer = ContainerWrapper<C::OuterContainer>;
#[inline(always)] fn leave(self) -> Self::OuterContainer { ContainerWrapper(self.0.leave()) }
}
impl<C: Negate> Negate for ContainerWrapper<C> {
#[inline(always)] fn negate(self) -> Self { ContainerWrapper(self.0.negate()) }
}
impl<C: ResultsIn<TS>, TS> ResultsIn<TS> for ContainerWrapper<C> {
#[inline(always)] fn results_in(self, step: &TS) -> Self { ContainerWrapper(self.0.results_in(step)) }
}

fn wrap<G: Scope, C: timely::Container>(stream: &StreamCore<G, C>) -> StreamCore<G, ContainerWrapper<C>> {
let mut builder = OperatorBuilder::new("Wrap".to_string(), stream.scope());
let (mut output, stream_out) = builder.new_output();
let mut input = builder.new_input(stream, Pipeline);
builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
input.for_each(|time, data| {
let mut session = output.session(&time);
session.give_container(&mut ContainerWrapper(std::mem::take(data)));
});
});
stream_out
}


fn main() {
timely::example(|scope| {

let numbers = scope.new_collection_from(1 .. 10u32).1;
let numbers: Collection<_, u32, isize, _> = wrap(&numbers.inner).as_collection();

scope.iterative::<u64,_,_>(|nested| {
let summary = Product::new(Default::default(), 1);
let variable = Variable::new_from(numbers.enter(nested), summary);
let mapped: Collection<_, u32, isize, _> = variable.inner.unary(Pipeline, "Map", |_,_| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for (x, _t, _d) in data.0.iter_mut() {
*x = if *x % 2 == 0 { *x/2 } else { *x };
}
session.give_container(data);
});
}
}).as_collection();
let result = mapped.inner.unary(Pipeline, "Unwrap", |_,_| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
session.give_container(&mut data.0);
});
}
}).as_collection().consolidate();
let result = wrap(&result.inner).as_collection();
variable.set(&result)
.leave()
});
})
}
179 changes: 140 additions & 39 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,79 @@ impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
/// .assert_eq(&evens);
/// });
/// ```
// TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
// an inherent method on `Collection`.
pub fn negate(&self) -> Collection<G, D, R, C>
where
StreamCore<G, C>: crate::operators::Negate<G, C>
pub fn negate(&self) -> Collection<G, D, R, C> where C: containers::Negate {
use timely::dataflow::channels::pact::Pipeline;
self.inner
.unary(Pipeline, "Negate", move |_,_| move |input, output| {
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
})
.as_collection()
}

/// Brings a Collection into a nested scope.
///
/// # Examples
///
/// ```
/// use timely::dataflow::Scope;
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let result = scope.region(|child| {
/// data.enter(child)
/// .leave()
/// });
///
/// data.assert_eq(&result);
/// });
/// ```
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R, <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
where
C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
T: Refines<<G as ScopeParent>::Timestamp>,
{
use timely::dataflow::channels::pact::Pipeline;
self.inner
.enter(child)
.unary(Pipeline, "Enter", move |_,_| move |input, output| {
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
})
.as_collection()
}

/// Advances a timestamp in the stream according to the timestamp actions on the path.
///
/// The path may advance the timestamp sufficiently that it is no longer valid, for example if
/// incrementing fields would result in integer overflow. In this case, the record is dropped.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
///
/// use differential_dataflow::input::Input;
///
/// timely::example(|scope| {
/// let summary1 = 5;
///
/// let data = scope.new_collection_from(1 .. 10).1;
/// /// Applies `results_in` on every timestamp in the collection.
/// data.results_in(summary1);
/// });
/// ```
pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
where
C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
{
crate::operators::Negate::negate(&self.inner).as_collection()
use timely::dataflow::channels::pact::Pipeline;
self.inner
.unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
})
.as_collection()
}
}

Expand Down Expand Up @@ -379,36 +445,6 @@ impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
.as_collection()
}

/// Brings a Collection into a nested scope.
///
/// # Examples
///
/// ```
/// use timely::dataflow::Scope;
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let result = scope.region(|child| {
/// data.enter(child)
/// .leave()
/// });
///
/// data.assert_eq(&result);
/// });
/// ```
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
where
T: Refines<<G as ScopeParent>::Timestamp>,
{
self.inner
.enter(child)
.map(|(data, time, diff)| (data, T::to_inner(time), diff))
.as_collection()
}

/// Brings a Collection into a nested scope, at varying times.
///
/// The `initial` function indicates the time at which each element of the Collection should appear.
Expand Down Expand Up @@ -558,8 +594,9 @@ use timely::dataflow::scopes::ScopeParent;
use timely::progress::timestamp::Refines;

/// Methods requiring a nested scope.
impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static, C: Container> Collection<Child<'a, G, T>, D, R, C>
where
C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
T: Refines<<G as ScopeParent>::Timestamp>,
{
/// Returns the final value of a Collection from a nested scope to its containing scope.
Expand All @@ -582,10 +619,13 @@ where
/// data.assert_eq(&result);
/// });
/// ```
pub fn leave(&self) -> Collection<G, D, R> {
pub fn leave(&self) -> Collection<G, D, R, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
use timely::dataflow::channels::pact::Pipeline;
self.inner
.leave()
.map(|(data, time, diff)| (data, time.to_outer(), diff))
.unary(Pipeline, "Leave", move |_,_| move |input, output| {
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
})
.as_collection()
}
}
Expand Down Expand Up @@ -690,3 +730,64 @@ where
.concatenate(iterator.into_iter().map(|x| x.inner))
.as_collection()
}

/// Traits that can be implemented by containers to provide functionality to collections based on them.
pub mod containers {

use timely::progress::{Timestamp, timestamp::Refines};
use crate::collection::Abelian;

/// A container that can negate its updates.
pub trait Negate {
/// Negates Abelian differences of each update.
fn negate(self) -> Self;
}
impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
fn negate(mut self) -> Self {
for (_data, _time, diff) in self.iter_mut() {
diff.negate();
}
self
}
}

/// A container that can enter from timestamp `T1` into timestamp `T2`.
pub trait Enter<T1, T2> {
/// The resulting container type.
type InnerContainer;
/// Update timestamps from `T1` to `T2`.
fn enter(self) -> Self::InnerContainer;
}
impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
type InnerContainer = Vec<(D, T2, R)>;
fn enter(self) -> Self::InnerContainer {
self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
}
}

/// A container that can leave from timestamp `T1` into timestamp `T2`.
pub trait Leave<T1, T2> {
/// The resulting container type.
type OuterContainer;
/// Update timestamps from `T1` to `T2`.
fn leave(self) -> Self::OuterContainer;
}
impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
type OuterContainer = Vec<(D, T2, R)>;
fn leave(self) -> Self::OuterContainer {
self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
}
}

/// A container that can advance timestamps by a summary `TS`.
pub trait ResultsIn<TS> {
/// Advance times in the container by `step`.
fn results_in(self, step: &TS) -> Self;
}
impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
fn results_in(self, step: &T::Summary) -> Self {
use timely::progress::PathSummary;
self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
}
}
}
Loading