diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 64a9f1ae2..ca4e30e01 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,6 +25,8 @@ jobs: toolchain: ${{ matrix.toolchain }} - name: Cargo test run: cargo test --workspace --all-targets + - name: Cargo doc test + run: cargo test --doc # Check formatting with rustfmt mdbook: diff --git a/differential-dataflow/examples/iterate_container.rs b/differential-dataflow/examples/iterate_container.rs new file mode 100644 index 000000000..df29fd009 --- /dev/null +++ b/differential-dataflow/examples/iterate_container.rs @@ -0,0 +1,84 @@ +//! Show an iterative scope example that uses a wrapper type around a container + +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::Operator; +use timely::order::Product; +use timely::dataflow::{Scope, StreamCore}; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use differential_dataflow::{AsCollection, Collection}; +use differential_dataflow::input::Input; +use differential_dataflow::operators::iterate::Variable; +use differential_dataflow::collection::containers::{Enter, Leave, Negate, ResultsIn}; + +/// A wrapper around a container that implements the necessary traits to be used in iterative scopes. +#[derive(Clone, Default)] +struct ContainerWrapper(C); + +impl timely::container::Accountable for ContainerWrapper { + #[inline(always)] fn record_count(&self) -> i64 { self.0.record_count() } + #[inline(always)] fn is_empty(&self) -> bool { self.0.is_empty() } +} +impl, T1, T2> Enter for ContainerWrapper { + type InnerContainer = ContainerWrapper; + #[inline(always)] fn enter(self) -> Self::InnerContainer { ContainerWrapper(self.0.enter()) } +} +impl, T1, T2> Leave for ContainerWrapper { + type OuterContainer = ContainerWrapper; + #[inline(always)] fn leave(self) -> Self::OuterContainer { ContainerWrapper(self.0.leave()) } +} +impl Negate for ContainerWrapper { + #[inline(always)] fn negate(self) -> Self { ContainerWrapper(self.0.negate()) } +} +impl, TS> ResultsIn for ContainerWrapper { + #[inline(always)] fn results_in(self, step: &TS) -> Self { ContainerWrapper(self.0.results_in(step)) } +} + +fn wrap(stream: &StreamCore) -> StreamCore> { + let mut builder = OperatorBuilder::new("Wrap".to_string(), stream.scope()); + let (mut output, stream_out) = builder.new_output(); + let mut input = builder.new_input(stream, Pipeline); + builder.build(move |_capability| move |_frontier| { + let mut output = output.activate(); + input.for_each(|time, data| { + let mut session = output.session(&time); + session.give_container(&mut ContainerWrapper(std::mem::take(data))); + }); + }); + stream_out +} + + +fn main() { + timely::example(|scope| { + + let numbers = scope.new_collection_from(1 .. 10u32).1; + let numbers: Collection<_, u32, isize, _> = wrap(&numbers.inner).as_collection(); + + scope.iterative::(|nested| { + let summary = Product::new(Default::default(), 1); + let variable = Variable::new_from(numbers.enter(nested), summary); + let mapped: Collection<_, u32, isize, _> = variable.inner.unary(Pipeline, "Map", |_,_| { + |input, output| { + input.for_each(|time, data| { + let mut session = output.session(&time); + for (x, _t, _d) in data.0.iter_mut() { + *x = if *x % 2 == 0 { *x/2 } else { *x }; + } + session.give_container(data); + }); + } + }).as_collection(); + let result = mapped.inner.unary(Pipeline, "Unwrap", |_,_| { + |input, output| { + input.for_each(|time, data| { + let mut session = output.session(&time); + session.give_container(&mut data.0); + }); + } + }).as_collection().consolidate(); + let result = wrap(&result.inner).as_collection(); + variable.set(&result) + .leave() + }); + }) +} diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 69847a081..8236b7257 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -204,13 +204,79 @@ impl Collection { /// .assert_eq(&evens); /// }); /// ``` - // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect - // an inherent method on `Collection`. - pub fn negate(&self) -> Collection - where - StreamCore: crate::operators::Negate + pub fn negate(&self) -> Collection where C: containers::Negate { + use timely::dataflow::channels::pact::Pipeline; + self.inner + .unary(Pipeline, "Negate", move |_,_| move |input, output| { + input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate())); + }) + .as_collection() + } + + /// Brings a Collection into a nested scope. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::Scope; + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let result = scope.region(|child| { + /// data.enter(child) + /// .leave() + /// }); + /// + /// data.assert_eq(&result); + /// }); + /// ``` + pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, D, R, ::Timestamp, T>>::InnerContainer> + where + C: containers::Enter<::Timestamp, T, InnerContainer: Container>, + T: Refines<::Timestamp>, + { + use timely::dataflow::channels::pact::Pipeline; + self.inner + .enter(child) + .unary(Pipeline, "Enter", move |_,_| move |input, output| { + input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter())); + }) + .as_collection() + } + + /// Advances a timestamp in the stream according to the timestamp actions on the path. + /// + /// The path may advance the timestamp sufficiently that it is no longer valid, for example if + /// incrementing fields would result in integer overflow. In this case, the record is dropped. + /// + /// # Examples + /// ``` + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen}; + /// + /// use differential_dataflow::input::Input; + /// + /// timely::example(|scope| { + /// let summary1 = 5; + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// /// Applies `results_in` on every timestamp in the collection. + /// data.results_in(summary1); + /// }); + /// ``` + pub fn results_in(&self, step: ::Summary) -> Self + where + C: containers::ResultsIn<::Summary>, { - crate::operators::Negate::negate(&self.inner).as_collection() + use timely::dataflow::channels::pact::Pipeline; + self.inner + .unary(Pipeline, "ResultsIn", move |_,_| move |input, output| { + input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step))); + }) + .as_collection() } } @@ -379,36 +445,6 @@ impl Collection { .as_collection() } - /// Brings a Collection into a nested scope. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::Scope; - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let result = scope.region(|child| { - /// data.enter(child) - /// .leave() - /// }); - /// - /// data.assert_eq(&result); - /// }); - /// ``` - pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, D, R> - where - T: Refines<::Timestamp>, - { - self.inner - .enter(child) - .map(|(data, time, diff)| (data, T::to_inner(time), diff)) - .as_collection() - } - /// Brings a Collection into a nested scope, at varying times. /// /// The `initial` function indicates the time at which each element of the Collection should appear. @@ -558,8 +594,9 @@ use timely::dataflow::scopes::ScopeParent; use timely::progress::timestamp::Refines; /// Methods requiring a nested scope. -impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection, D, R> +impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static, C: Container> Collection, D, R, C> where + C: containers::Leave, T: Refines<::Timestamp>, { /// Returns the final value of a Collection from a nested scope to its containing scope. @@ -582,10 +619,13 @@ where /// data.assert_eq(&result); /// }); /// ``` - pub fn leave(&self) -> Collection { + pub fn leave(&self) -> Collection>::OuterContainer> { + use timely::dataflow::channels::pact::Pipeline; self.inner .leave() - .map(|(data, time, diff)| (data, time.to_outer(), diff)) + .unary(Pipeline, "Leave", move |_,_| move |input, output| { + input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave())); + }) .as_collection() } } @@ -690,3 +730,64 @@ where .concatenate(iterator.into_iter().map(|x| x.inner)) .as_collection() } + +/// Traits that can be implemented by containers to provide functionality to collections based on them. +pub mod containers { + + use timely::progress::{Timestamp, timestamp::Refines}; + use crate::collection::Abelian; + + /// A container that can negate its updates. + pub trait Negate { + /// Negates Abelian differences of each update. + fn negate(self) -> Self; + } + impl Negate for Vec<(D, T, R)> { + fn negate(mut self) -> Self { + for (_data, _time, diff) in self.iter_mut() { + diff.negate(); + } + self + } + } + + /// A container that can enter from timestamp `T1` into timestamp `T2`. + pub trait Enter { + /// The resulting container type. + type InnerContainer; + /// Update timestamps from `T1` to `T2`. + fn enter(self) -> Self::InnerContainer; + } + impl, R> Enter for Vec<(D, T1, R)> { + type InnerContainer = Vec<(D, T2, R)>; + fn enter(self) -> Self::InnerContainer { + self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect() + } + } + + /// A container that can leave from timestamp `T1` into timestamp `T2`. + pub trait Leave { + /// The resulting container type. + type OuterContainer; + /// Update timestamps from `T1` to `T2`. + fn leave(self) -> Self::OuterContainer; + } + impl, T2: Timestamp, R> Leave for Vec<(D, T1, R)> { + type OuterContainer = Vec<(D, T2, R)>; + fn leave(self) -> Self::OuterContainer { + self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect() + } + } + + /// A container that can advance timestamps by a summary `TS`. + pub trait ResultsIn { + /// Advance times in the container by `step`. + fn results_in(self, step: &TS) -> Self; + } + impl ResultsIn for Vec<(D, T, R)> { + fn results_in(self, step: &T::Summary) -> Self { + use timely::progress::PathSummary; + self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect() + } + } +} diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 9bb4c55f6..57651e68e 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -34,7 +34,7 @@ use std::fmt::Debug; use std::ops::Deref; use timely::Container; -use timely::progress::{Timestamp, PathSummary}; +use timely::progress::Timestamp; use timely::order::Product; use timely::dataflow::*; @@ -42,7 +42,7 @@ use timely::dataflow::scopes::child::Iterative; use timely::dataflow::operators::{Feedback, ConnectLoop}; use timely::dataflow::operators::feedback::Handle; -use crate::{Data, Collection, AsCollection}; +use crate::{Data, Collection}; use crate::difference::{Semigroup, Abelian}; use crate::lattice::Lattice; @@ -167,7 +167,7 @@ where impl Variable where G: Scope, - StreamCore: crate::operators::Negate + ResultsIn, + C: crate::collection::containers::Negate + crate::collection::containers::ResultsIn<::Summary>, { /// Creates a new initially empty `Variable`. /// @@ -210,8 +210,8 @@ where pub fn set_concat(self, result: &Collection) -> Collection { let step = self.step; result - .inner .results_in(step) + .inner .connect_loop(self.feedback); self.collection @@ -246,7 +246,7 @@ where impl SemigroupVariable where G: Scope, - StreamCore: ResultsIn, + C: crate::collection::containers::ResultsIn<::Summary>, { /// Creates a new initially empty `SemigroupVariable`. pub fn new(scope: &mut G, step: ::Summary) -> Self { @@ -259,8 +259,8 @@ where pub fn set(self, result: &Collection) -> Collection { let step = self.step; result - .inner .results_in(step) + .inner .connect_loop(self.feedback); self.collection @@ -273,47 +273,3 @@ impl Deref for SemigroupVariable< &self.collection } } - -/// Extension trait for streams. -pub trait ResultsIn { - /// Advances a timestamp in the stream according to the timestamp actions on the path. - /// - /// The path may advance the timestamp sufficiently that it is no longer valid, for example if - /// incrementing fields would result in integer overflow. In this case, the record is dropped. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen}; - /// - /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::ResultsIn; - /// - /// timely::example(|scope| { - /// let summary1 = 5; - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// /// Applies `results_in` on every timestamp in the collection. - /// data.results_in(summary1); - /// }); - /// ``` - fn results_in(&self, step: ::Summary) -> Self; -} - -impl ResultsIn for Collection -where - G: Scope, - C: Clone, - StreamCore: ResultsIn, -{ - fn results_in(&self, step: ::Summary) -> Self { - self.inner.results_in(step).as_collection() - } -} - -impl ResultsIn> for Stream { - fn results_in(&self, step: ::Summary) -> Self { - use timely::dataflow::operators::Map; - self.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d))) - } -} diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 6112a840f..bba1ca605 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -4,15 +4,13 @@ //! operators have specialized implementations to make them work efficiently, and are in addition //! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`). -pub use self::negate::Negate; pub use self::reduce::{Reduce, Threshold, Count}; -pub use self::iterate::{Iterate, ResultsIn}; +pub use self::iterate::Iterate; pub use self::join::{Join, JoinCore}; pub use self::count::CountTotal; pub use self::threshold::ThresholdTotal; pub mod arrange; -pub mod negate; pub mod reduce; pub mod consolidate; pub mod iterate; diff --git a/differential-dataflow/src/operators/negate.rs b/differential-dataflow/src/operators/negate.rs deleted file mode 100644 index b354c95ca..000000000 --- a/differential-dataflow/src/operators/negate.rs +++ /dev/null @@ -1,54 +0,0 @@ -//! Negate the diffs of collections and streams. - -use timely::Data; -use timely::dataflow::{Scope, Stream, StreamCore}; -use timely::dataflow::operators::Map; - -use crate::{AsCollection, Collection}; -use crate::difference::Abelian; - -/// Negate the contents of a stream. -pub trait Negate { - /// Creates a new collection whose counts are the negation of those in the input. - /// - /// This method is most commonly used with `concat` to get those element in one collection but not another. - /// However, differential dataflow computations are still defined for all values of the difference type `R`, - /// including negative counts. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let odds = data.filter(|x| x % 2 == 1); - /// let evens = data.filter(|x| x % 2 == 0); - /// - /// odds.negate() - /// .concat(&data) - /// .assert_eq(&evens); - /// }); - /// ``` - fn negate(&self) -> Self; -} - -impl Negate for Collection -where - G: Scope, - C: Clone, - StreamCore: Negate, -{ - fn negate(&self) -> Self { - self.inner.negate().as_collection() - } -} - -impl Negate> for Stream { - fn negate(&self) -> Self { - self.map_in_place(|x| x.2.negate()) - } -} -