From f17921a8b56c4e6793601083682c7bb4de66b6b9 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Feb 2026 17:10:43 -0500 Subject: [PATCH 01/10] Tee implementation without Clone requirement --- timely/src/dataflow/channels/pushers/tee.rs | 183 ++++++++++++-------- timely/src/dataflow/stream.rs | 2 +- 2 files changed, 110 insertions(+), 75 deletions(-) diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 14e2c07ef..7a1955d81 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -1,4 +1,9 @@ -//! A `Push` implementor with a list of `Box` to forward pushes to. +//! The `Tee` and `TeeHelper` types, by which stream consumers rendezvous with the producer. +//! +//! The design is a shared list of `Box>` types, which are added to by `TeeHelper` +//! and pushed into by the `Tee` once the dataflow is running. Some care is taken so that `T` +//! does not need to implement `Clone`, other than for the instantiation of a (boxed) variant +//! that supports multiple consumers, to avoid the constraint for single-consumer streams. use std::cell::RefCell; use std::fmt::{self, Debug}; @@ -9,106 +14,136 @@ use crate::dataflow::channels::Message; use crate::communication::Push; use crate::{Container, Data}; -type PushList = Rc>>>>>; +use push_set::{PushSet, PushOne, PushMany, MessagePusher}; +mod push_set { -/// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. -pub struct Tee { - buffer: C, - shared: PushList, -} + use crate::dataflow::channels::Message; -impl Push> for Tee { - #[inline] - fn push(&mut self, message: &mut Option>) { - let mut pushers = self.shared.borrow_mut(); - if let Some(message) = message { - for index in 1..pushers.len() { - self.buffer.clone_from(&message.data); - Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]); + use crate::communication::Push; + use crate::Container; + + + /// A type that can be pushed at, and which may be able to accommodate a similar pusher. + /// + /// This trait exists to support fanning out of pushers when the data may not be `Clone`, + /// allowing the implementation for multiple pushers (which may require cloning) to be + /// behind an abstraction. + pub trait PushSet : Push { + fn insert(&mut self, other: Box>) -> Result<(), Box>>; + } + + /// A `Push` wrapper that implements `PushOne`. + pub struct PushOne

{ inner: P } + impl> Push for PushOne

{ + fn push(&mut self, item: &mut Option) { self.inner.push(item) } + } + impl + 'static> PushSet for PushOne

{ + fn insert(&mut self, other: Box>) -> Result<(), Box>> { Err(other) } + } + impl

From

for PushOne

{ fn from(inner: P) -> Self { Self { inner } } } + + /// A `Push` wrapper for a list of boxed implementors. + pub struct PushMany { + /// Used to clone into, for the chance to avoid continual re-allocation. + buffer: Option, + /// The intended recipients of pushed values. + list: Vec>>, + } + impl Push for PushMany { + fn push(&mut self, item: &mut Option) { + // We defensively clone `element` for all but the last element of `self.list`, + // as we cannot be sure that a `Push` implementor will not modify the contents. + // Indeed, that's the goal of the `Push` trait, to allow one to take ownership. + + // This guard prevents dropping `self.buffer` when a `None` is received. + // We might prefer to do that, to reduce steady state memory. + if item.is_some() { + for pusher in self.list.iter_mut().rev().skip(1).rev() { + self.buffer.clone_from(&item); + pusher.push(&mut self.buffer); + } + if let Some(pusher) = self.list.last_mut() { + std::mem::swap(&mut self.buffer, item); + pusher.push(&mut self.buffer); + } } + else { for pusher in self.list.iter_mut() { pusher.done(); } } } - else { - for index in 1..pushers.len() { - pushers[index-1].done(); - } + } + impl PushSet for PushMany { + fn insert(&mut self, other: Box>) -> Result<(), Box>> { + self.list.push(other); + Ok(()) } - if !pushers.is_empty() { - let last = pushers.len() - 1; - pushers[last].push(message); + } + impl From>>> for PushMany { fn from(list: Vec>>) -> Self { Self { list, buffer: None } } } + + /// A temporary struct to re-present `Message::push_at` as `Push`. The intent is to delete. + pub struct MessagePusher>> { pub inner: P, pub phantom: std::marker::PhantomData<(T, C)> } + impl>> Push> for MessagePusher { + fn push(&mut self, message: &mut Option>) { + if let Some(message) = message.as_mut() { + Message::push_at(&mut message.data, message.time.clone(), &mut self.inner); + } + else { + self.inner.done() + } } } + } -impl Tee { - /// Allocates a new pair of `Tee` and `TeeHelper`. - pub fn new() -> (Tee, TeeHelper) { - let shared = Rc::new(RefCell::new(Vec::new())); - let port = Tee { - buffer: Default::default(), - shared: Rc::clone(&shared), - }; +/// The shared state between a `Tee` and `TeeHelper`: an extensible list of pushers. +type PushList = Rc>>>>>; - (port, TeeHelper { shared }) - } -} +/// The writing half of a shared destination for pushing at. +pub struct Tee { shared: PushList } -impl Clone for Tee { - fn clone(&self) -> Self { - Self { - buffer: Default::default(), - shared: Rc::clone(&self.shared), +impl Push> for Tee { + #[inline] + fn push(&mut self, message: &mut Option>) { + if let Some(pushee) = self.shared.borrow_mut().as_mut() { + pushee.push(message) } } } -impl Debug for Tee -where - C: Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut debug = f.debug_struct("Tee"); - debug.field("buffer", &self.buffer); - - if let Ok(shared) = self.shared.try_borrow() { - debug.field("shared", &format!("{} pushers", shared.len())); - } else { - debug.field("shared", &"..."); - } - - debug.finish() +impl Tee { + /// Allocates a new pair of `Tee` and `TeeHelper`. + pub fn new() -> (Tee, TeeHelper) { + let shared = Rc::new(RefCell::new(None)); + let port = Tee { shared: Rc::clone(&shared) }; + (port, TeeHelper { shared }) } } -/// A shared list of `Box` used to add `Push` implementors. -pub struct TeeHelper { - shared: PushList, +impl Debug for Tee { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Tee") } } -impl TeeHelper { +/// The subscribe half of a shared destination for pushing at. +pub struct TeeHelper { shared: PushList } + +impl TeeHelper { /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. pub fn add_pusher>+'static>(&self, pusher: P) { - self.shared.borrow_mut().push(Box::new(pusher)); + let pusher = MessagePusher { inner: pusher, phantom: std::marker::PhantomData }; + let mut borrow = self.shared.borrow_mut(); + *borrow = Some(if let Some(mut inner) = borrow.take() { + if let Err(pusher) = inner.insert(Box::new(pusher)) { + let prior = inner as Box>>; + Box::new(PushMany::from(vec![prior, pusher])) + } + else { inner } + } + else { Box::new(PushOne::from(pusher)) }); } } impl Clone for TeeHelper { - fn clone(&self) -> Self { - TeeHelper { - shared: Rc::clone(&self.shared), - } - } + fn clone(&self) -> Self { TeeHelper { shared: Rc::clone(&self.shared) } } } impl Debug for TeeHelper { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut debug = f.debug_struct("TeeHelper"); - - if let Ok(shared) = self.shared.try_borrow() { - debug.field("shared", &format!("{} pushers", shared.len())); - } else { - debug.field("shared", &"..."); - } - - debug.finish() - } + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "TeeHelper") } } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index b5bb93a01..5ea8dadfe 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -46,7 +46,7 @@ impl Clone for StreamCore { /// A stream batching data in vectors. pub type Stream = StreamCore>; -impl StreamCore { +impl StreamCore { /// Connects the stream to a destination. /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the From e2c4ae026d7b8843250d2553f432b9f490dc3dfe Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Feb 2026 21:33:16 -0500 Subject: [PATCH 02/10] Improve trait bounds --- timely/src/dataflow/channels/mod.rs | 3 +-- timely/src/dataflow/channels/pushers/tee.rs | 7 ++----- timely/src/dataflow/stream.rs | 4 ++-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 36fb44bb1..1192735a2 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -2,7 +2,6 @@ use serde::{Deserialize, Serialize}; use crate::communication::Push; -use crate::Container; /// A collection of types that may be pushed at. pub mod pushers; @@ -32,7 +31,7 @@ impl Message { } } -impl Message { +impl Message { /// Creates a new message instance from arguments. pub fn new(time: T, data: C, from: usize, seq: usize) -> Self { Message { time, data, from, seq } diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 7a1955d81..07d090574 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -18,10 +18,7 @@ use push_set::{PushSet, PushOne, PushMany, MessagePusher}; mod push_set { use crate::dataflow::channels::Message; - use crate::communication::Push; - use crate::Container; - /// A type that can be pushed at, and which may be able to accommodate a similar pusher. /// @@ -80,7 +77,7 @@ mod push_set { /// A temporary struct to re-present `Message::push_at` as `Push`. The intent is to delete. pub struct MessagePusher>> { pub inner: P, pub phantom: std::marker::PhantomData<(T, C)> } - impl>> Push> for MessagePusher { + impl>> Push> for MessagePusher { fn push(&mut self, message: &mut Option>) { if let Some(message) = message.as_mut() { Message::push_at(&mut message.data, message.time.clone(), &mut self.inner); @@ -124,7 +121,7 @@ impl Debug for Tee { /// The subscribe half of a shared destination for pushing at. pub struct TeeHelper { shared: PushList } -impl TeeHelper { +impl TeeHelper { /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. pub fn add_pusher>+'static>(&self, pusher: P) { let pusher = MessagePusher { inner: pusher, phantom: std::marker::PhantomData }; diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 5ea8dadfe..c1ff2030e 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -46,12 +46,12 @@ impl Clone for StreamCore { /// A stream batching data in vectors. pub type Stream = StreamCore>; -impl StreamCore { +impl StreamCore { /// Connects the stream to a destination. /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { + pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) where C: Clone+Default+'static { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { From 4f05bac5925db6c33f76917e11b18366c59853c1 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Feb 2026 21:38:22 -0500 Subject: [PATCH 03/10] Remove arguments from Message::new --- timely/src/dataflow/channels/mod.rs | 8 +++++--- timely/src/dataflow/operators/core/enterleave.rs | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 1192735a2..0e4dbacf7 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -33,8 +33,10 @@ impl Message { impl Message { /// Creates a new message instance from arguments. - pub fn new(time: T, data: C, from: usize, seq: usize) -> Self { - Message { time, data, from, seq } + /// + /// Zero values are installed for `from` and `seq`, and are meant to be populated by `LogPusher`. + pub fn new(time: T, data: C) -> Self { + Message { time, data, from: 0, seq: 0 } } /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher @@ -43,7 +45,7 @@ impl Message { pub fn push_at>>(buffer: &mut C, time: T, pusher: &mut P) { let data = ::std::mem::take(buffer); - let message = Message::new(time, data, 0, 0); + let message = Message::new(time, data); let mut bundle = Some(message); pusher.push(&mut bundle); diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 14787459b..1d1e2cbe6 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -141,7 +141,7 @@ impl, TContainer: Container fn push(&mut self, element: &mut Option>) { if let Some(outer_message) = element { let data = ::std::mem::take(&mut outer_message.data); - let mut inner_message = Some(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0)); + let mut inner_message = Some(Message::new(TInner::to_inner(outer_message.time.clone()), data)); self.targets.push(&mut inner_message); if let Some(inner_message) = inner_message { outer_message.data = inner_message.data; @@ -169,7 +169,7 @@ where TOuter: Timestamp, TInner: Timestamp+Refines, { fn push(&mut self, message: &mut Option>) { if let Some(inner_message) = message { let data = ::std::mem::take(&mut inner_message.data); - let mut outer_message = Some(Message::new(inner_message.time.clone().to_outer(), data, 0, 0)); + let mut outer_message = Some(Message::new(inner_message.time.clone().to_outer(), data)); self.targets.push(&mut outer_message); if let Some(outer_message) = outer_message { inner_message.data = outer_message.data; From 6a90ff1762d8f3335f64474eef0e6525f5ab9e3f Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Feb 2026 00:43:28 -0500 Subject: [PATCH 04/10] Update MSRV to 1.86 for dyn upcasts --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a7d320464..d26d95059 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: - windows toolchain: - stable - - 1.85 + - 1.86 name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }} runs-on: ${{ matrix.os }}-latest steps: From 9bca139550fb75ac396c76a5f435c4369eb0e688 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Feb 2026 00:47:41 -0500 Subject: [PATCH 05/10] Introduce TeeHelper::upgrade --- timely/src/dataflow/channels/mod.rs | 8 +-- timely/src/dataflow/channels/pushers/tee.rs | 62 ++++++++++--------- .../dataflow/operators/core/capture/replay.rs | 4 +- timely/src/dataflow/stream.rs | 2 +- 4 files changed, 39 insertions(+), 37 deletions(-) diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 0e4dbacf7..ce109363c 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -31,7 +31,7 @@ impl Message { } } -impl Message { +impl Message { /// Creates a new message instance from arguments. /// /// Zero values are installed for `from` and `seq`, and are meant to be populated by `LogPusher`. @@ -39,10 +39,10 @@ impl Message { Message { time, data, from: 0, seq: 0 } } - /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher - /// leaves in place, or the container's default element. The buffer is left in an undefined state. + /// Forms a message from borrowed parts, and replaces `buffer` with what is left by the `push` call. + /// If the pusher returns nothing, then `buffer` is set to the default for the container. #[inline] - pub fn push_at>>(buffer: &mut C, time: T, pusher: &mut P) { + pub fn push_at>>(buffer: &mut C, time: T, pusher: &mut P) where C: Default { let data = ::std::mem::take(buffer); let message = Message::new(time, data); diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 07d090574..d4a518181 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -14,10 +14,9 @@ use crate::dataflow::channels::Message; use crate::communication::Push; use crate::{Container, Data}; -use push_set::{PushSet, PushOne, PushMany, MessagePusher}; +use push_set::{PushSet, PushOne, PushMany}; mod push_set { - use crate::dataflow::channels::Message; use crate::communication::Push; /// A type that can be pushed at, and which may be able to accommodate a similar pusher. @@ -26,7 +25,8 @@ mod push_set { /// allowing the implementation for multiple pushers (which may require cloning) to be /// behind an abstraction. pub trait PushSet : Push { - fn insert(&mut self, other: Box>) -> Result<(), Box>>; + /// If a list of boxed pushers, that list. + fn as_list(&mut self) -> Option<&mut Vec>>>; } /// A `Push` wrapper that implements `PushOne`. @@ -35,7 +35,7 @@ mod push_set { fn push(&mut self, item: &mut Option) { self.inner.push(item) } } impl + 'static> PushSet for PushOne

{ - fn insert(&mut self, other: Box>) -> Result<(), Box>> { Err(other) } + fn as_list(&mut self) -> Option<&mut Vec>>> { None } } impl

From

for PushOne

{ fn from(inner: P) -> Self { Self { inner } } } @@ -68,26 +68,10 @@ mod push_set { } } impl PushSet for PushMany { - fn insert(&mut self, other: Box>) -> Result<(), Box>> { - self.list.push(other); - Ok(()) - } + fn as_list(&mut self) -> Option<&mut Vec>>> { Some(&mut self.list) } } impl From>>> for PushMany { fn from(list: Vec>>) -> Self { Self { list, buffer: None } } } - /// A temporary struct to re-present `Message::push_at` as `Push`. The intent is to delete. - pub struct MessagePusher>> { pub inner: P, pub phantom: std::marker::PhantomData<(T, C)> } - impl>> Push> for MessagePusher { - fn push(&mut self, message: &mut Option>) { - if let Some(message) = message.as_mut() { - Message::push_at(&mut message.data, message.time.clone(), &mut self.inner); - } - else { - self.inner.done() - } - } - } - } /// The shared state between a `Tee` and `TeeHelper`: an extensible list of pushers. @@ -121,19 +105,37 @@ impl Debug for Tee { /// The subscribe half of a shared destination for pushing at. pub struct TeeHelper { shared: PushList } -impl TeeHelper { +impl TeeHelper { + /// Upgrades the shared list to one that supports cloning. + /// + /// This method "teaches" the `Tee` how to clone containers, which enables adding multiple pushers. + /// It introduces the cost of one additional virtual call through a boxed trait, so one should not + /// upgrade for no reason. + pub fn upgrade(&self) where C: Clone { + let mut borrow = self.shared.borrow_mut(); + if let Some(mut pusher) = borrow.take() { + if pusher.as_list().is_none() { + *borrow = Some(Box::new(PushMany::from(vec![pusher as Box>>]))); + } + else { + *borrow = Some(pusher); + } + } + else { + *borrow = Some(Box::new(PushMany::from(vec![]))); + } + } + /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. pub fn add_pusher>+'static>(&self, pusher: P) { - let pusher = MessagePusher { inner: pusher, phantom: std::marker::PhantomData }; + if self.shared.borrow().is_some() { self.upgrade(); } let mut borrow = self.shared.borrow_mut(); - *borrow = Some(if let Some(mut inner) = borrow.take() { - if let Err(pusher) = inner.insert(Box::new(pusher)) { - let prior = inner as Box>>; - Box::new(PushMany::from(vec![prior, pusher])) - } - else { inner } + if let Some(many) = borrow.as_mut() { + many.as_list().unwrap().push(Box::new(pusher)) + } + else { + *borrow = Some(Box::new(PushOne::from(pusher))); } - else { Box::new(PushOne::from(pusher)) }); } } diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 70a4d9c1b..ec623abf0 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -99,8 +99,8 @@ where Owned(Event::Progress(vec)) => { progress.internals[0].extend(vec.into_iter()); }, - Owned(Event::Messages(time, mut data)) => { - Message::push_at(&mut data, time, &mut output); + Owned(Event::Messages(time, data)) => { + output.push(&mut Some(Message::new(time, data))); } Borrowed(Event::Progress(vec)) => { progress.internals[0].extend(vec.iter().cloned()); diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index c1ff2030e..0ae970164 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -51,7 +51,7 @@ impl StreamCore { /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) where C: Clone+Default+'static { + pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) where C: Clone+'static { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { From b2fd209da18e86bb7e4d6b16e8c11a9f81e13edc Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Feb 2026 01:29:08 -0500 Subject: [PATCH 06/10] Make StreamCore connection consume self --- mdbook/src/chapter_2/chapter_2_3.md | 19 ++++++----- mdbook/src/chapter_2/chapter_2_4.md | 4 +-- mdbook/src/chapter_4/chapter_4_2.md | 14 ++++---- mdbook/src/chapter_4/chapter_4_3.md | 2 +- timely/examples/bfs.rs | 4 +-- timely/examples/hashjoin.rs | 2 +- timely/examples/loopdemo.rs | 4 +-- timely/examples/pagerank.rs | 2 +- timely/examples/pingpong.rs | 2 +- timely/examples/unionfind.rs | 4 +-- timely/src/dataflow/channels/pushers/tee.rs | 13 +++---- .../operators/aggregation/aggregate.rs | 4 +-- .../operators/aggregation/state_machine.rs | 4 +-- timely/src/dataflow/operators/branch.rs | 8 ++--- timely/src/dataflow/operators/broadcast.rs | 4 +-- .../operators/core/capture/capture.rs | 8 ++--- timely/src/dataflow/operators/core/concat.rs | 8 ++--- .../src/dataflow/operators/core/enterleave.rs | 8 ++--- .../src/dataflow/operators/core/exchange.rs | 4 +-- .../src/dataflow/operators/core/feedback.rs | 10 +++--- timely/src/dataflow/operators/core/filter.rs | 4 +-- timely/src/dataflow/operators/core/inspect.rs | 14 ++++---- timely/src/dataflow/operators/core/map.rs | 20 +++++------ timely/src/dataflow/operators/core/ok_err.rs | 4 +-- .../src/dataflow/operators/core/partition.rs | 4 +-- timely/src/dataflow/operators/core/probe.rs | 8 ++--- timely/src/dataflow/operators/core/rc.rs | 6 ++-- timely/src/dataflow/operators/core/reclock.rs | 6 ++-- timely/src/dataflow/operators/count.rs | 8 ++--- timely/src/dataflow/operators/delay.rs | 12 +++---- timely/src/dataflow/operators/filter.rs | 4 +-- .../dataflow/operators/generic/builder_raw.rs | 4 +-- .../dataflow/operators/generic/builder_rc.rs | 4 +-- .../dataflow/operators/generic/notificator.rs | 2 +- .../dataflow/operators/generic/operator.rs | 34 +++++++++---------- timely/src/dataflow/operators/map.rs | 12 +++---- timely/src/dataflow/operators/partition.rs | 14 ++++---- timely/src/dataflow/operators/result.rs | 24 ++++++------- timely/src/dataflow/stream.rs | 4 +-- timely/tests/shape_scaling.rs | 2 +- 40 files changed, 160 insertions(+), 158 deletions(-) diff --git a/mdbook/src/chapter_2/chapter_2_3.md b/mdbook/src/chapter_2/chapter_2_3.md index e51fc213b..5ca859a33 100644 --- a/mdbook/src/chapter_2/chapter_2_3.md +++ b/mdbook/src/chapter_2/chapter_2_3.md @@ -126,12 +126,12 @@ use timely::dataflow::operators::{ToStream, Partition, Inspect}; fn main() { timely::example(|scope| { - let streams = (0..10).to_stream(scope) - .partition(3, |x| (x % 3, x)); + let mut streams = (0..10).to_stream(scope) + .partition(3, |x| (x % 3, x)); - streams[0].inspect(|x| println!("seen 0: {:?}", x)); - streams[1].inspect(|x| println!("seen 1: {:?}", x)); - streams[2].inspect(|x| println!("seen 2: {:?}", x)); + streams.pop().unwrap().inspect(|x| println!("seen 2: {:?}", x)); + streams.pop().unwrap().inspect(|x| println!("seen 1: {:?}", x)); + streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x)); }); } ``` @@ -147,11 +147,12 @@ use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect}; fn main() { timely::example(|scope| { - let streams = (0..10).to_stream(scope) + let mut streams = (0..10).to_stream(scope) .partition(3, |x| (x % 3, x)); - streams[0] - .concat(&streams[1]) - .concat(&streams[2]) + streams + .pop().unwrap() + .concat(streams.pop().unwrap()) + .concat(streams.pop().unwrap()) .inspect(|x| println!("seen: {:?}", x)); }); } diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index a7bf1a32b..3fa4d2f59 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -182,7 +182,7 @@ fn main() { let in1 = (0 .. 10).to_stream(scope); let in2 = (0 .. 10).to_stream(scope); - in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| { + in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| { let mut notificator = FrontierNotificator::default(); let mut stash = HashMap::new(); @@ -233,7 +233,7 @@ fn main() { let in1 = (0 .. 10).to_stream(scope); let in2 = (0 .. 10).to_stream(scope); - in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| { + in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| { let mut stash = HashMap::new(); diff --git a/mdbook/src/chapter_4/chapter_4_2.md b/mdbook/src/chapter_4/chapter_4_2.md index 7c057da38..45230367a 100644 --- a/mdbook/src/chapter_4/chapter_4_2.md +++ b/mdbook/src/chapter_4/chapter_4_2.md @@ -24,7 +24,7 @@ fn main() { // circulate numbers, Collatz stepping each time. (1 .. 10) .to_stream(scope) - .concat(&stream) + .concat(stream) .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } ) .inspect(|x| println!("{:?}", x)) .filter(|x| *x != 1) @@ -63,17 +63,17 @@ fn main() { let results1 = stream1.map(|x| 3 * x + 1); // partition the input and feedback streams by even-ness. - let parts = + let mut parts = (1 .. 10) .to_stream(scope) - .concat(&results0) - .concat(&results1) + .concat(results0) + .concat(results1) .inspect(|x| println!("{:?}", x)) .partition(2, |x| (x % 2, x)); // connect each part appropriately. - parts[0].connect_loop(handle0); - parts[1].connect_loop(handle1); + parts.pop().unwrap().connect_loop(handle1); + parts.pop().unwrap().connect_loop(handle0); }); } ``` @@ -103,7 +103,7 @@ fn main() { input .enter(subscope) - .concat(&stream) + .concat(stream) .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } ) .inspect(|x| println!("{:?}", x)) .filter(|x| *x != 1) diff --git a/mdbook/src/chapter_4/chapter_4_3.md b/mdbook/src/chapter_4/chapter_4_3.md index f543385df..9fd014ac0 100644 --- a/mdbook/src/chapter_4/chapter_4_3.md +++ b/mdbook/src/chapter_4/chapter_4_3.md @@ -76,7 +76,7 @@ fn main() { // Assign timestamps to records so that not much work is in each time. .delay(|number, time| number / 100 ) // Buffer records until all prior timestamps have completed. - .binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| { + .binary_frontier(cycle, Pipeline, Pipeline, "Buffer", move |capability, info| { move |(input1, frontier1), (input2, frontier2), output| { diff --git a/timely/examples/bfs.rs b/timely/examples/bfs.rs index 371b5ff41..cdae8359e 100644 --- a/timely/examples/bfs.rs +++ b/timely/examples/bfs.rs @@ -45,7 +45,7 @@ fn main() { // use the stream of edges graph.binary_notify( - &stream, + stream, Exchange::new(|x: &(u32, u32)| u64::from(x.0)), Exchange::new(|x: &(u32, u32)| u64::from(x.0)), "BFS", @@ -130,7 +130,7 @@ fn main() { }); } ) - .concat(&(0..1).map(|x| (x,x)).to_stream(scope)) + .concat((0..1).map(|x| (x,x)).to_stream(scope)) .connect_loop(handle); }); }).unwrap(); // asserts error-free execution; diff --git a/timely/examples/hashjoin.rs b/timely/examples/hashjoin.rs index 6719af3b5..ae4dfb683 100644 --- a/timely/examples/hashjoin.rs +++ b/timely/examples/hashjoin.rs @@ -32,7 +32,7 @@ fn main() { let exchange2 = Exchange::new(|x: &(u64, u64)| x.0); stream1 - .binary(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| { + .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| { let mut map1 = HashMap::>::new(); let mut map2 = HashMap::>::new(); diff --git a/timely/examples/loopdemo.rs b/timely/examples/loopdemo.rs index 132f66a6c..8875ca1c1 100644 --- a/timely/examples/loopdemo.rs +++ b/timely/examples/loopdemo.rs @@ -27,11 +27,11 @@ fn main() { let step = stream - .concat(&loop_stream) + .concat(loop_stream) .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 }) .filter(|x| x > &1); - step.connect_loop(loop_handle); + step.clone().connect_loop(loop_handle); step.probe_with(&probe); }); diff --git a/timely/examples/pagerank.rs b/timely/examples/pagerank.rs index 293df687e..656a4b5da 100644 --- a/timely/examples/pagerank.rs +++ b/timely/examples/pagerank.rs @@ -23,7 +23,7 @@ fn main() { // bring edges and ranks together! let changes = edge_stream.binary_frontier( - &rank_stream, + rank_stream, Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64), Exchange::new(|x: &(usize, i64)| x.0 as u64), "PageRank", diff --git a/timely/examples/pingpong.rs b/timely/examples/pingpong.rs index 2104de289..edd644d38 100644 --- a/timely/examples/pingpong.rs +++ b/timely/examples/pingpong.rs @@ -14,7 +14,7 @@ fn main() { (0 .. elements) .filter(move |&x| (x as usize) % peers == index) .to_stream(scope) - .concat(&cycle) + .concat(cycle) .exchange(|&x| x) .map_in_place(|x| *x += 1) .branch_when(move |t| t < &iterations).1 diff --git a/timely/examples/unionfind.rs b/timely/examples/unionfind.rs index da9914c99..841a4f465 100644 --- a/timely/examples/unionfind.rs +++ b/timely/examples/unionfind.rs @@ -48,11 +48,11 @@ fn main() { } trait UnionFind { - fn union_find(&self) -> Self; + fn union_find(self) -> Self; } impl UnionFind for Stream { - fn union_find(&self) -> Stream { + fn union_find(self) -> Stream { self.unary(Pipeline, "UnionFind", |_,_| { diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index d4a518181..e5842ab4d 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -103,15 +103,17 @@ impl Debug for Tee { } /// The subscribe half of a shared destination for pushing at. +/// +/// Cloning a `TeeHelper` will upgrade it, teaching the shared list how to clone containers. pub struct TeeHelper { shared: PushList } -impl TeeHelper { +impl TeeHelper { /// Upgrades the shared list to one that supports cloning. /// /// This method "teaches" the `Tee` how to clone containers, which enables adding multiple pushers. /// It introduces the cost of one additional virtual call through a boxed trait, so one should not /// upgrade for no reason. - pub fn upgrade(&self) where C: Clone { + pub fn upgrade(&self) where T: Clone, C: Clone { let mut borrow = self.shared.borrow_mut(); if let Some(mut pusher) = borrow.take() { if pusher.as_list().is_none() { @@ -127,8 +129,7 @@ impl TeeHelper { } /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. - pub fn add_pusher>+'static>(&self, pusher: P) { - if self.shared.borrow().is_some() { self.upgrade(); } + pub fn add_pusher>+'static>(self, pusher: P) { let mut borrow = self.shared.borrow_mut(); if let Some(many) = borrow.as_mut() { many.as_list().unwrap().push(Box::new(pusher)) @@ -139,8 +140,8 @@ impl TeeHelper { } } -impl Clone for TeeHelper { - fn clone(&self) -> Self { TeeHelper { shared: Rc::clone(&self.shared) } } +impl Clone for TeeHelper { + fn clone(&self) -> Self { self.upgrade(); TeeHelper { shared: Rc::clone(&self.shared) } } } impl Debug for TeeHelper { diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index aabe3d62b..60a56fcff 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -61,7 +61,7 @@ pub trait Aggregate { /// }); /// ``` fn aggregateR+'static, H: Fn(&K)->u64+'static>( - &self, + self, fold: F, emit: E, hash: H) -> Stream where S::Timestamp: Eq; @@ -70,7 +70,7 @@ pub trait Aggregate { impl, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate for Stream { fn aggregateR+'static, H: Fn(&K)->u64+'static>( - &self, + self, fold: F, emit: E, hash: H) -> Stream where S::Timestamp: Eq { diff --git a/timely/src/dataflow/operators/aggregation/state_machine.rs b/timely/src/dataflow/operators/aggregation/state_machine.rs index 0cd2d62d1..7bd67cc82 100644 --- a/timely/src/dataflow/operators/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/aggregation/state_machine.rs @@ -51,7 +51,7 @@ pub trait StateMachine { I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic H: Fn(&K)->u64+'static, // "hash" function for keys - >(&self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq ; + >(self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq ; } impl StateMachine for Stream { @@ -61,7 +61,7 @@ impl StateMachine f I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic H: Fn(&K)->u64+'static, // "hash" function for keys - >(&self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq { + >(self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq { let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state) let mut states = HashMap::new(); // keys -> state diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index ed9e1fde7..46f224e9e 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -30,14 +30,14 @@ pub trait Branch { /// }); /// ``` fn branch( - &self, + self, condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, ) -> (Stream, Stream); } impl Branch for Stream { fn branch( - &self, + self, condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, ) -> (Stream, Stream) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); @@ -94,11 +94,11 @@ pub trait BranchWhen: Sized { /// after_five.inspect(|x| println!("Times 5 and later: {:?}", x)); /// }); /// ``` - fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self); + fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self); } impl BranchWhen for StreamCore { - fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { + fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); builder.set_notify(false); diff --git a/timely/src/dataflow/operators/broadcast.rs b/timely/src/dataflow/operators/broadcast.rs index 21d80d6da..8d99332e7 100644 --- a/timely/src/dataflow/operators/broadcast.rs +++ b/timely/src/dataflow/operators/broadcast.rs @@ -18,11 +18,11 @@ pub trait Broadcast { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn broadcast(&self) -> Self; + fn broadcast(self) -> Self; } impl Broadcast for Stream { - fn broadcast(&self) -> Stream { + fn broadcast(self) -> Stream { // NOTE: Simplified implementation due to underlying motion // in timely dataflow internals. Optimize once they have diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index 4d8e3053e..efc48506e 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -17,7 +17,7 @@ use crate::progress::Timestamp; use super::{Event, EventPusher}; /// Capture a stream of timestamped data for later replay. -pub trait Capture { +pub trait Capture : Sized { /// Captures a stream of timestamped data for later replay. /// /// # Examples @@ -103,10 +103,10 @@ pub trait Capture { /// assert_eq!(recv0.extract()[0].1, (0..10).collect::>()); /// # } /// ``` - fn capture_into+'static>(&self, pusher: P); + fn capture_into+'static>(self, pusher: P); /// Captures a stream using Rust's MPSC channels. - fn capture(&self) -> ::std::sync::mpsc::Receiver> { + fn capture(self) -> ::std::sync::mpsc::Receiver> { let (send, recv) = ::std::sync::mpsc::channel(); self.capture_into(send); recv @@ -114,7 +114,7 @@ pub trait Capture { } impl Capture for StreamCore { - fn capture_into+'static>(&self, mut event_pusher: P) { + fn capture_into+'static>(self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index 84c9bc16e..c1d378372 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -15,15 +15,15 @@ pub trait Concat { /// timely::example(|scope| { /// /// let stream = (0..10).to_stream(scope); - /// stream.concat(&stream) + /// stream.clone().concat(stream) /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concat(&self, _: &StreamCore) -> StreamCore; + fn concat(self, _: StreamCore) -> StreamCore; } impl Concat for StreamCore { - fn concat(&self, other: &StreamCore) -> StreamCore { + fn concat(self, other: StreamCore) -> StreamCore { self.scope().concatenate([self.clone(), other.clone()]) } } @@ -73,7 +73,7 @@ impl Concatenate for G { builder.set_notify(false); // create new input handles for each input stream. - let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::>(); + let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::>(); // create one output handle for the concatenated results. let (mut output, result) = builder.new_output(); diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 1d1e2cbe6..c8f77fc16 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -50,11 +50,11 @@ pub trait Enter, C> { /// }); /// }); /// ``` - fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore, C>; + fn enter<'a>(self, _: &Child<'a, G, T>) -> StreamCore, C>; } impl, C: Container> Enter for StreamCore { - fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore, C> { + fn enter<'a>(self, scope: &Child<'a, G, T>) -> StreamCore, C> { use crate::scheduling::Scheduler; @@ -100,11 +100,11 @@ pub trait Leave { /// }); /// }); /// ``` - fn leave(&self) -> StreamCore; + fn leave(self) -> StreamCore; } impl> Leave for StreamCore, C> { - fn leave(&self) -> StreamCore { + fn leave(self) -> StreamCore { let scope = self.scope(); diff --git a/timely/src/dataflow/operators/core/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs index 593e3144a..cfab5f716 100644 --- a/timely/src/dataflow/operators/core/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -23,7 +23,7 @@ pub trait Exchange { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn exchange(&self, route: F) -> Self + fn exchange(self, route: F) -> Self where for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static; } @@ -37,7 +37,7 @@ where + crate::dataflow::channels::ContainerBytes + for<'a> PushInto>, { - fn exchange(&self, route: F) -> StreamCore + fn exchange(self, route: F) -> StreamCore where for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static, { diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index c05f5912e..2a20f1278 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -27,7 +27,7 @@ pub trait Feedback { /// // circulate 0..10 for 100 iterations. /// let (handle, cycle) = scope.feedback(1); /// (0..10).to_stream(scope) - /// .concat(&cycle) + /// .concat(cycle) /// .inspect(|x| println!("seen: {:?}", x)) /// .branch_when(|t| t < &100).1 /// .connect_loop(handle); @@ -54,7 +54,7 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// scope.iterative::(|inner| { /// let (handle, cycle) = inner.loop_variable(1); /// (0..10).to_stream(inner) - /// .concat(&cycle) + /// .concat(cycle) /// .inspect(|x| println!("seen: {:?}", x)) /// .branch_when(|t| t.inner < 100).1 /// .connect_loop(handle); @@ -95,17 +95,17 @@ pub trait ConnectLoop { /// // circulate 0..10 for 100 iterations. /// let (handle, cycle) = scope.feedback(1); /// (0..10).to_stream(scope) - /// .concat(&cycle) + /// .concat(cycle) /// .inspect(|x| println!("seen: {:?}", x)) /// .branch_when(|t| t < &100).1 /// .connect_loop(handle); /// }); /// ``` - fn connect_loop(&self, handle: Handle); + fn connect_loop(self, handle: Handle); } impl ConnectLoop for StreamCore { - fn connect_loop(&self, handle: Handle) { + fn connect_loop(self, handle: Handle) { let mut builder = handle.builder; let summary = handle.summary; diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs index aecadd050..376cc9c09 100644 --- a/timely/src/dataflow/operators/core/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -20,14 +20,14 @@ pub trait Filter { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn filter)->bool+'static>(&self, predicate: P) -> Self; + fn filter)->bool+'static>(self, predicate: P) -> Self; } impl Filter for StreamCore where for<'a> C: PushInto> { - fn filter)->bool+'static>(&self, mut predicate: P) -> StreamCore { + fn filter)->bool+'static>(self, mut predicate: P) -> StreamCore { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each_time(|time, data| { output.session(&time) diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index bb4d882cc..27ff694de 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -21,7 +21,7 @@ where /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn inspect(&self, mut func: F) -> Self + fn inspect(self, mut func: F) -> Self where F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static, { @@ -41,7 +41,7 @@ where /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x)); /// }); /// ``` - fn inspect_time(&self, mut func: F) -> Self + fn inspect_time(self, mut func: F) -> Self where F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static, { @@ -63,7 +63,7 @@ where /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len())); /// }); /// ``` - fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self { + fn inspect_batch(self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self { self.inspect_core(move |event| { if let Ok((time, data)) = event { func(time, data); @@ -90,14 +90,14 @@ where /// }); /// }); /// ``` - fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; + fn inspect_core(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } impl Inspect for StreamCore where for<'a> &'a C: IntoIterator, { - fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { + fn inspect_core(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { self.inspect_container(func) } } @@ -123,12 +123,12 @@ pub trait InspectCore { /// }); /// }); /// ``` - fn inspect_container(&self, func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; + fn inspect_container(self, func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } impl InspectCore for StreamCore { - fn inspect_container(&self, mut func: F) -> StreamCore + fn inspect_container(self, mut func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static { use crate::progress::timestamp::Timestamp; diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 61d9fd359..23d603e7d 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -7,7 +7,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for `Stream`. -pub trait Map { +pub trait Map : Sized { /// Consumes each element of the stream and yields a new element. /// /// # Examples @@ -22,7 +22,7 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map(&self, mut logic: L) -> StreamCore + fn map(self, mut logic: L) -> StreamCore where C2: Container + SizableContainer + PushInto, L: FnMut(C::Item<'_>)->D2 + 'static, @@ -43,7 +43,7 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_map(&self, logic: L) -> StreamCore + fn flat_map(self, logic: L) -> StreamCore where I: IntoIterator, C2: Container + SizableContainer + PushInto, @@ -76,7 +76,7 @@ pub trait Map { /// /// assert_eq!((4..14).collect::>(), data.extract()[0].1); /// ``` - fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, C, L, I> + fn flat_map_builder<'t, I, L>(self, logic: L) -> FlatMapBuilder where C: Clone + 'static, L: for<'a> Fn(C::Item<'a>) -> I, @@ -90,7 +90,7 @@ impl Map for StreamCore { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_map(&self, mut logic: L) -> StreamCore + fn flat_map(self, mut logic: L) -> StreamCore where I: IntoIterator, C2: Container + SizableContainer + PushInto, @@ -107,26 +107,26 @@ impl Map for StreamCore { /// A stream wrapper that allows the accumulation of flatmap logic. -pub struct FlatMapBuilder<'t, T, C: DrainContainer, F: 'static, I> +pub struct FlatMapBuilder where for<'a> F: Fn(C::Item<'a>) -> I, { - stream: &'t T, + stream: T, logic: F, marker: std::marker::PhantomData, } -impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I> +impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder where for<'a> F: Fn(C::Item<'a>) -> I, { /// Create a new wrapper with no action on the stream. - pub fn new(stream: &'t T, logic: F) -> Self { + pub fn new(stream: T, logic: F) -> Self { FlatMapBuilder { stream, logic, marker: std::marker::PhantomData } } /// Transform a flatmapped stream through addiitonal logic. - pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> { + pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder) -> I2 + 'static, I2> { let logic = self.logic; FlatMapBuilder { stream: self.stream, diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 53746c6af..0b54d20dc 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -30,7 +30,7 @@ pub trait OkErr { /// }); /// ``` fn ok_err( - &self, + self, logic: L, ) -> (StreamCore, StreamCore) where @@ -42,7 +42,7 @@ pub trait OkErr { impl OkErr for StreamCore { fn ok_err( - &self, + self, mut logic: L, ) -> (StreamCore, StreamCore) where diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index a878e33d2..9d6f32aee 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -27,14 +27,14 @@ pub trait Partition { /// } /// }); /// ``` - fn partition(&self, parts: u64, route: F) -> Vec> + fn partition(self, parts: u64, route: F) -> Vec> where CB: ContainerBuilder + PushInto, F: FnMut(C::Item<'_>) -> (u64, D2) + 'static; } impl Partition for StreamCore { - fn partition(&self, parts: u64, mut route: F) -> Vec> + fn partition(self, parts: u64, mut route: F) -> Vec> where CB: ContainerBuilder + PushInto, F: FnMut(C::Item<'_>) -> (u64, D2) + 'static, diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index 5b8821bf8..7625e390c 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -44,7 +44,7 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe(&self) -> Handle; + fn probe(self) -> Handle; /// Inserts a progress probe in a stream. /// @@ -76,18 +76,18 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(&self, handle: &Handle) -> StreamCore; + fn probe_with(self, handle: &Handle) -> StreamCore; } impl Probe for StreamCore { - fn probe(&self) -> Handle { + fn probe(self) -> Handle { // the frontier is shared state; scope updates, handle reads. let handle = Handle::::new(); self.probe_with(&handle); handle } - fn probe_with(&self, handle: &Handle) -> StreamCore { + fn probe_with(self, handle: &Handle) -> StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index f1bd9abb3..3e49e1fd7 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -21,11 +21,11 @@ pub trait SharedStream { /// .inspect_container(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn shared(&self) -> StreamCore>; + fn shared(self) -> StreamCore>; } impl SharedStream for StreamCore { - fn shared(&self) -> StreamCore> { + fn shared(self) -> StreamCore> { self.unary(Pipeline, "Shared", move |_, _| { move |input, output| { input.for_each_time(|time, data| { @@ -53,7 +53,7 @@ mod test { let shared = shared.inspect_container(|x| println!("seen: {x:?}")); scope .concatenate([ - shared.unary(Pipeline, "read shared 1", |_, _| { + shared.clone().unary(Pipeline, "read shared 1", |_, _| { move |input, output| { input.for_each_time(|time, data| { output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize)); diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index 1ed563d1b..9dca4dc59 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -35,7 +35,7 @@ pub trait Reclock { /// .map(|_| ()); /// /// // reclock the data. - /// data.reclock(&clock) + /// data.reclock(clock) /// .capture() /// }); /// @@ -45,11 +45,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock(&self, clock: &StreamCore) -> Self; + fn reclock(self, clock: StreamCore) -> Self; } impl Reclock for StreamCore { - fn reclock(&self, clock: &StreamCore) -> StreamCore { + fn reclock(self, clock: StreamCore) -> StreamCore { let mut stash = vec![]; diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/count.rs index 1fed3c1e9..2cca27c22 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/count.rs @@ -7,7 +7,7 @@ use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; /// Accumulates records within a timestamp. -pub trait Accumulate { +pub trait Accumulate : Sized { /// Accumulates records within a timestamp. /// /// # Examples @@ -25,7 +25,7 @@ pub trait Accumulate { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![45])]); /// ``` - fn accumulate(&self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream; + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream; /// Counts the number of records observed at each time. /// /// # Examples @@ -43,13 +43,13 @@ pub trait Accumulate { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![10])]); /// ``` - fn count(&self) -> Stream { + fn count(self) -> Stream { self.accumulate(0, |sum, data| *sum += data.len()) } } impl, D: Data> Accumulate for Stream { - fn accumulate(&self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream { + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream { let mut accums = HashMap::new(); self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/delay.rs index aeeb26a20..a1b2d8f85 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/delay.rs @@ -36,7 +36,7 @@ pub trait Delay { /// }); /// }); /// ``` - fn delayG::Timestamp+'static>(&self, func: L) -> Self; + fn delayG::Timestamp+'static>(self, func: L) -> Self; /// Advances the timestamp of records using a supplied function. /// @@ -63,7 +63,7 @@ pub trait Delay { /// }); /// }); /// ``` - fn delay_totalG::Timestamp+'static>(&self, func: L) -> Self + fn delay_totalG::Timestamp+'static>(self, func: L) -> Self where G::Timestamp: TotalOrder; /// Advances the timestamp of batches of records using a supplied function. @@ -91,11 +91,11 @@ pub trait Delay { /// }); /// }); /// ``` - fn delay_batchG::Timestamp+'static>(&self, func: L) -> Self; + fn delay_batchG::Timestamp+'static>(self, func: L) -> Self; } impl, D: Data> Delay for Stream { - fn delayG::Timestamp+'static>(&self, mut func: L) -> Self { + fn delayG::Timestamp+'static>(self, mut func: L) -> Self { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { input.for_each_time(|time, data| { @@ -117,13 +117,13 @@ impl, D: Data> Delay for StreamG::Timestamp+'static>(&self, func: L) -> Self + fn delay_totalG::Timestamp+'static>(self, func: L) -> Self where G::Timestamp: TotalOrder { self.delay(func) } - fn delay_batchG::Timestamp+'static>(&self, mut func: L) -> Self { + fn delay_batchG::Timestamp+'static>(self, mut func: L) -> Self { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { input.for_each_time(|time, data| { diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index 03e05522d..53ca14373 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -19,11 +19,11 @@ pub trait Filter { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn filterbool+'static>(&self, predicate: P) -> Self; + fn filterbool+'static>(self, predicate: P) -> Self; } impl Filter for Stream { - fn filterbool+'static>(&self, mut predicate: P) -> Stream { + fn filterbool+'static>(self, mut predicate: P) -> Stream { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each_time(|time, data| { let mut session = output.session(&time); diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 4659383a9..e6669a732 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -104,7 +104,7 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> P::Puller + pub fn new_input(&mut self, stream: StreamCore, pact: P) -> P::Puller where P: ParallelizationContract { @@ -113,7 +113,7 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: I) -> P::Puller + pub fn new_input_connection(&mut self, stream: StreamCore, pact: P, connection: I) -> P::Puller where P: ParallelizationContract, I: IntoIterator::Summary>)>, diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 08d499266..f98f9bf6d 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -54,7 +54,7 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> InputHandleCore + pub fn new_input(&mut self, stream: StreamCore, pact: P) -> InputHandleCore where P: ParallelizationContract { @@ -70,7 +70,7 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: I) -> InputHandleCore + pub fn new_input_connection(&mut self, stream: StreamCore, pact: P, connection: I) -> InputHandleCore where P: ParallelizationContract, I: IntoIterator::Summary>)> + Clone, diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 4f64ebc8f..e0e13b59e 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -188,7 +188,7 @@ fn notificator_delivers_notifications_in_topo_order() { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { /// let (in1_handle, in1) = scope.new_input(); /// let (in2_handle, in2) = scope.new_input(); -/// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { +/// in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); /// move |(input1, frontier1), (input2, frontier2), output| { diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 027f17942..b95cf48eb 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -53,7 +53,7 @@ pub trait Operator { /// .container::>(); /// }); /// ``` - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(self, pact: P, name: &str, constructor: B) -> StreamCore where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -91,7 +91,7 @@ pub trait Operator { &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> - (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + (self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -119,7 +119,7 @@ pub trait Operator { /// }); /// }); /// ``` - fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(self, pact: P, name: &str, constructor: B) -> StreamCore where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -142,7 +142,7 @@ pub trait Operator { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { /// let (in1_handle, in1) = scope.new_input(); /// let (in2_handle, in2) = scope.new_input(); - /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { + /// in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); /// move |(input1, frontier1), (input2, frontier2), output| { @@ -175,7 +175,7 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(self, other: StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where C2: Container, CB: ContainerBuilder, @@ -202,7 +202,7 @@ pub trait Operator { /// let (in1_handle, in1) = scope.new_input(); /// let (in2_handle, in2) = scope.new_input(); /// - /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| { + /// in1.binary_notify(in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| { /// input1.for_each_time(|time, data| { /// output.session(&time).give_containers(data); /// notificator.notify_at(time.retain(output.output_index())); @@ -235,7 +235,7 @@ pub trait Operator { &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> - (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + (self, other: StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -251,7 +251,7 @@ pub trait Operator { /// timely::example(|scope| { /// let stream2 = (0u64..10).to_stream(scope); /// (0u64..10).to_stream(scope) - /// .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| { + /// .binary(stream2, Pipeline, Pipeline, "example", |default_cap, _info| { /// let mut cap = Some(default_cap.delayed(&12)); /// move |input1, input2, output| { /// if let Some(ref c) = cap.take() { @@ -263,7 +263,7 @@ pub trait Operator { /// }).inspect(|x| println!("{:?}", x)); /// }); /// ``` - fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(self, other: StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where C2: Container, CB: ContainerBuilder, @@ -297,7 +297,7 @@ pub trait Operator { /// }); /// }); /// ``` - fn sink(&self, pact: P, name: &str, logic: L) + fn sink(self, pact: P, name: &str, logic: L) where L: FnMut((&mut InputHandleCore, &MutableAntichain))+'static, P: ParallelizationContract; @@ -305,7 +305,7 @@ pub trait Operator { impl Operator for StreamCore { - fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(self, pact: P, name: &str, constructor: B) -> StreamCore where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -338,7 +338,7 @@ impl Operator for StreamCore { &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> - (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + (self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.unary_frontier(pact, name, move |capability, _info| { let mut notificator = FrontierNotificator::default(); @@ -354,7 +354,7 @@ impl Operator for StreamCore { }) } - fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(self, pact: P, name: &str, constructor: B) -> StreamCore where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -380,7 +380,7 @@ impl Operator for StreamCore { stream } - fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(self, other: StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where C2: Container, CB: ContainerBuilder, @@ -420,7 +420,7 @@ impl Operator for StreamCore { &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> - (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + (self, other: StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { let mut notificator = FrontierNotificator::default(); @@ -438,7 +438,7 @@ impl Operator for StreamCore { } - fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(self, other: StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where C2: Container, CB: ContainerBuilder, @@ -471,7 +471,7 @@ impl Operator for StreamCore { stream } - fn sink(&self, pact: P, name: &str, mut logic: L) + fn sink(self, pact: P, name: &str, mut logic: L) where L: FnMut((&mut InputHandleCore, &MutableAntichain))+'static, P: ParallelizationContract { diff --git a/timely/src/dataflow/operators/map.rs b/timely/src/dataflow/operators/map.rs index 97d0712ac..ff461c424 100644 --- a/timely/src/dataflow/operators/map.rs +++ b/timely/src/dataflow/operators/map.rs @@ -7,7 +7,7 @@ use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::operators::core::{Map as MapCore}; /// Extension trait for `Stream`. -pub trait Map { +pub trait Map : Sized { /// Consumes each element of the stream and yields a new element. /// /// # Examples @@ -20,7 +20,7 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn mapD2+'static>(&self, mut logic: L) -> Stream { + fn mapD2+'static>(self, mut logic: L) -> Stream { self.flat_map(move |x| std::iter::once(logic(x))) } /// Updates each element of the stream and yields the element, re-using memory where possible. @@ -35,7 +35,7 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_in_place(&self, logic: L) -> Stream; + fn map_in_place(self, logic: L) -> Stream; /// Consumes each element of the stream and yields some number of new elements. /// /// # Examples @@ -48,11 +48,11 @@ pub trait Map { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_mapI+'static>(&self, logic: L) -> Stream where I::Item: Data; + fn flat_mapI+'static>(self, logic: L) -> Stream where I::Item: Data; } impl Map for Stream { - fn map_in_place(&self, mut logic: L) -> Stream { + fn map_in_place(self, mut logic: L) -> Stream { self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| { input.for_each_time(|time, data| { let mut session = output.session(&time); @@ -66,7 +66,7 @@ impl Map for Stream { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_mapI+'static>(&self, logic: L) -> Stream where I::Item: Data { + fn flat_mapI+'static>(self, logic: L) -> Stream where I::Item: Data { MapCore::flat_map(self, logic) } } diff --git a/timely/src/dataflow/operators/partition.rs b/timely/src/dataflow/operators/partition.rs index 6388efc63..bd0acda7d 100644 --- a/timely/src/dataflow/operators/partition.rs +++ b/timely/src/dataflow/operators/partition.rs @@ -14,19 +14,19 @@ pub trait Partition (u64, D2)> { /// use timely::dataflow::operators::{ToStream, Partition, Inspect}; /// /// timely::example(|scope| { - /// let streams = (0..10).to_stream(scope) - /// .partition(3, |x| (x % 3, x)); + /// let mut streams = (0..10).to_stream(scope) + /// .partition(3, |x| (x % 3, x)); /// - /// streams[0].inspect(|x| println!("seen 0: {:?}", x)); - /// streams[1].inspect(|x| println!("seen 1: {:?}", x)); - /// streams[2].inspect(|x| println!("seen 2: {:?}", x)); + /// streams.pop().unwrap().inspect(|x| println!("seen 2: {:?}", x)); + /// streams.pop().unwrap().inspect(|x| println!("seen 1: {:?}", x)); + /// streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x)); /// }); /// ``` - fn partition(&self, parts: u64, route: F) -> Vec>; + fn partition(self, parts: u64, route: F) -> Vec>; } impl(u64, D2)+'static> Partition for Stream { - fn partition(&self, parts: u64, route: F) -> Vec> { + fn partition(self, parts: u64, route: F) -> Vec> { PartitionCore::partition::, _, _>(self, parts, route) } } diff --git a/timely/src/dataflow/operators/result.rs b/timely/src/dataflow/operators/result.rs index 6dddf8a71..5a5bf6210 100644 --- a/timely/src/dataflow/operators/result.rs +++ b/timely/src/dataflow/operators/result.rs @@ -18,7 +18,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn ok(&self) -> Stream; + fn ok(self) -> Stream; /// Returns a new instance of `self` containing only `err` records. /// @@ -32,7 +32,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn err(&self) -> Stream; + fn err(self) -> Stream; /// Returns a new instance of `self` applying `logic` on all `Ok` records. /// @@ -46,7 +46,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_ok T2 + 'static>(&self, logic: L) -> Stream>; + fn map_ok T2 + 'static>(self, logic: L) -> Stream>; /// Returns a new instance of `self` applying `logic` on all `Err` records. /// @@ -60,7 +60,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_err E2 + 'static>(&self, logic: L) -> Stream>; + fn map_err E2 + 'static>(self, logic: L) -> Stream>; /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err` /// records. @@ -76,7 +76,7 @@ pub trait ResultStream { /// }); /// ``` fn and_then Result + 'static>( - &self, + self, logic: L, ) -> Stream>; @@ -92,31 +92,31 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn unwrap_or_else T + 'static>(&self, logic: L) -> Stream; + fn unwrap_or_else T + 'static>(self, logic: L) -> Stream; } impl ResultStream for Stream> { - fn ok(&self) -> Stream { + fn ok(self) -> Stream { self.flat_map(Result::ok) } - fn err(&self) -> Stream { + fn err(self) -> Stream { self.flat_map(Result::err) } - fn map_ok T2 + 'static>(&self, mut logic: L) -> Stream> { + fn map_ok T2 + 'static>(self, mut logic: L) -> Stream> { self.map(move |r| r.map(&mut logic)) } - fn map_err E2 + 'static>(&self, mut logic: L) -> Stream> { + fn map_err E2 + 'static>(self, mut logic: L) -> Stream> { self.map(move |r| r.map_err(&mut logic)) } - fn and_then Result + 'static>(&self, mut logic: L) -> Stream> { + fn and_then Result + 'static>(self, mut logic: L) -> Stream> { self.map(move |r| r.and_then(&mut logic)) } - fn unwrap_or_else T + 'static>(&self, mut logic: L) -> Stream { + fn unwrap_or_else T + 'static>(self, mut logic: L) -> Stream { self.map(move |r| r.unwrap_or_else(&mut logic)) } } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 0ae970164..d096c5d9e 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -27,7 +27,7 @@ pub struct StreamCore { ports: TeeHelper, } -impl Clone for StreamCore { +impl Clone for StreamCore { fn clone(&self) -> Self { Self { name: self.name, @@ -51,7 +51,7 @@ impl StreamCore { /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) where C: Clone+'static { + pub fn connect_to>+'static>(self, target: Target, pusher: P, identifier: usize) where C: Clone+'static { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { diff --git a/timely/tests/shape_scaling.rs b/timely/tests/shape_scaling.rs index b6c136755..755b6f365 100644 --- a/timely/tests/shape_scaling.rs +++ b/timely/tests/shape_scaling.rs @@ -28,7 +28,7 @@ fn operator_scaling(scale: u64) { let (output, stream) = builder.new_output_connection::,_>([]); use timely::progress::Antichain; let connectivity = [(index, Antichain::from_elem(Default::default()))]; - handles.push((builder.new_input_connection(&part, Pipeline, connectivity), output)); + handles.push((builder.new_input_connection(part, Pipeline, connectivity), output)); outputs.push(stream); } From 12639e2afe04186ddc0cd55ac7b8030e6bcc5f69 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Feb 2026 01:42:00 -0500 Subject: [PATCH 07/10] Remove Clone from Container --- .../dataflow/operators/core/capture/replay.rs | 2 +- timely/src/dataflow/operators/core/concat.rs | 12 +-------- timely/src/dataflow/operators/core/input.rs | 26 +++++++++---------- timely/src/dataflow/stream.rs | 2 +- timely/src/lib.rs | 4 +-- 5 files changed, 18 insertions(+), 28 deletions(-) diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index ec623abf0..7772d5ed9 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -62,7 +62,7 @@ pub trait Replay : Sized { fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore; } -impl Replay for I +impl Replay for I where I : IntoIterator, ::Item: EventIterator+'static, diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index c1d378372..720dc9a9c 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -24,7 +24,7 @@ pub trait Concat { impl Concat for StreamCore { fn concat(self, other: StreamCore) -> StreamCore { - self.scope().concatenate([self.clone(), other.clone()]) + self.scope().concatenate([self, other]) } } @@ -51,16 +51,6 @@ pub trait Concatenate { I: IntoIterator>; } -impl Concatenate for StreamCore { - fn concatenate(&self, sources: I) -> StreamCore - where - I: IntoIterator> - { - let clone = self.clone(); - self.scope().concatenate(Some(clone).into_iter().chain(sources)) - } -} - impl Concatenate for G { fn concatenate(&self, sources: I) -> StreamCore where diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 9d28ce0b2..cc558fb0e 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -59,7 +59,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore); + fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore); /// Create a new [StreamCore] and [Handle] through which to supply input. /// @@ -96,7 +96,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input_with_builder(&mut self) -> (Handle<::Timestamp, CB>, StreamCore); + fn new_input_with_builder>(&mut self) -> (Handle<::Timestamp, CB>, StreamCore); /// Create a new stream from a supplied interactive handle. /// @@ -129,24 +129,24 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore; + fn input_from>(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore) { + fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore) { let mut handle = Handle::new(); let stream = self.input_from(&mut handle); (handle, stream) } - fn new_input_with_builder(&mut self) -> (Handle<::Timestamp, CB>, StreamCore) { + fn new_input_with_builder>(&mut self) -> (Handle<::Timestamp, CB>, StreamCore) { let mut handle = Handle::new_with_builder(); let stream = self.input_from(&mut handle); (handle, stream) } - fn input_from(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore { + fn input_from>(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore { let (output, registrar) = Tee::<::Timestamp, CB::Container>::new(); let counter = Counter::new(output); let produced = Rc::clone(counter.produced()); @@ -215,7 +215,7 @@ impl Operate for Operator { /// A handle to an input `StreamCore`, used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct Handle { +pub struct Handle> { activate: Vec, progress: Vec>>>, pushers: Vec>>, @@ -224,7 +224,7 @@ pub struct Handle { now_at: T, } -impl Handle> { +impl Handle> { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -264,7 +264,7 @@ impl Handle> { } } -impl Handle { +impl> Handle { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -483,7 +483,7 @@ impl Handle { impl PushInto for Handle where T: Timestamp, - CB: ContainerBuilder + PushInto, + CB: ContainerBuilder + PushInto, { #[inline] fn push_into(&mut self, item: D) { @@ -492,7 +492,7 @@ where } } -impl Handle { +impl> Handle { /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. /// /// # Examples @@ -526,13 +526,13 @@ impl Handle { } } -impl Default for Handle { +impl> Default for Handle { fn default() -> Self { Self::new_with_builder() } } -impl Drop for Handle { +impl> Drop for Handle { fn drop(&mut self) { self.close_epoch(); } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index d096c5d9e..47486f9c6 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -51,7 +51,7 @@ impl StreamCore { /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(self, target: Target, pusher: P, identifier: usize) where C: Clone+'static { + pub fn connect_to>+'static>(self, target: Target, pusher: P, identifier: usize) where C: 'static { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { diff --git a/timely/src/lib.rs b/timely/src/lib.rs index a21c63343..065b6e9e9 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -107,8 +107,8 @@ impl Data for T { } /// A composite trait for types usable as containers in timely dataflow. /// /// The `Container` trait is necessary for all containers in timely dataflow channels. -pub trait Container: Accountable + Default + Clone + 'static { } -impl Container for C { } +pub trait Container: Accountable + Default + 'static { } +impl Container for C { } /// A composite trait for types usable as container builders in timely dataflow. pub trait ContainerBuilder: timely_container::ContainerBuilder + Default + 'static {} From 9a5e14d30d43d97d2322cb3ac399de6631982fb1 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Feb 2026 09:04:45 -0500 Subject: [PATCH 08/10] Replace Data trait by 'static --- timely/src/dataflow/channels/pushers/tee.rs | 4 ++-- .../dataflow/operators/aggregation/aggregate.rs | 8 ++++---- .../operators/aggregation/state_machine.rs | 8 ++++---- timely/src/dataflow/operators/branch.rs | 6 +++--- timely/src/dataflow/operators/broadcast.rs | 2 +- timely/src/dataflow/operators/count.rs | 9 ++++----- timely/src/dataflow/operators/delay.rs | 5 ++--- timely/src/dataflow/operators/filter.rs | 5 ++--- .../src/dataflow/operators/flow_controlled.rs | 9 ++++----- timely/src/dataflow/operators/input.rs | 9 ++++----- timely/src/dataflow/operators/map.rs | 11 +++++------ timely/src/dataflow/operators/partition.rs | 5 ++--- timely/src/dataflow/operators/result.rs | 17 ++++++++--------- timely/src/dataflow/operators/to_stream.rs | 5 ++--- .../src/dataflow/operators/unordered_input.rs | 6 ++---- timely/src/lib.rs | 10 ++-------- timely/src/synchronization/sequence.rs | 2 +- 17 files changed, 52 insertions(+), 69 deletions(-) diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index e5842ab4d..c1a971e98 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -12,7 +12,7 @@ use std::rc::Rc; use crate::dataflow::channels::Message; use crate::communication::Push; -use crate::{Container, Data}; +use crate::Container; use push_set::{PushSet, PushOne, PushMany}; mod push_set { @@ -80,7 +80,7 @@ type PushList = Rc>>>>>; /// The writing half of a shared destination for pushing at. pub struct Tee { shared: PushList } -impl Push> for Tee { +impl Push> for Tee { #[inline] fn push(&mut self, message: &mut Option>) { if let Some(pushee) = self.shared.borrow_mut().as_mut() { diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index 60a56fcff..388b615ba 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -2,7 +2,7 @@ use std::hash::Hash; use std::collections::HashMap; -use crate::{Data, ExchangeData}; +use crate::ExchangeData; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; @@ -60,16 +60,16 @@ pub trait Aggregate { /// .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5))); /// }); /// ``` - fn aggregateR+'static, H: Fn(&K)->u64+'static>( + fn aggregateR+'static, H: Fn(&K)->u64+'static>( self, fold: F, emit: E, hash: H) -> Stream where S::Timestamp: Eq; } -impl, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate for Stream { +impl, K: ExchangeData+Hash+Eq+Clone, V: ExchangeData> Aggregate for Stream { - fn aggregateR+'static, H: Fn(&K)->u64+'static>( + fn aggregateR+'static, H: Fn(&K)->u64+'static>( self, fold: F, emit: E, diff --git a/timely/src/dataflow/operators/aggregation/state_machine.rs b/timely/src/dataflow/operators/aggregation/state_machine.rs index 7bd67cc82..77c117409 100644 --- a/timely/src/dataflow/operators/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/aggregation/state_machine.rs @@ -2,7 +2,7 @@ use std::hash::Hash; use std::collections::HashMap; -use crate::{Data, ExchangeData}; +use crate::ExchangeData; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; @@ -46,7 +46,7 @@ pub trait StateMachine { /// }); /// ``` fn state_machine< - R: Data, // output type + R: 'static, // output type D: Default+'static, // per-key state (data) I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic @@ -54,9 +54,9 @@ pub trait StateMachine { >(self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq ; } -impl StateMachine for Stream { +impl StateMachine for Stream { fn state_machine< - R: Data, // output type + R: 'static, // output type D: Default+'static, // per-key state (data) I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index 46f224e9e..b03757b4c 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -4,10 +4,10 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::OutputBuilder; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::{Scope, Stream, StreamCore}; -use crate::{Container, Data}; +use crate::Container; /// Extension trait for `Stream`. -pub trait Branch { +pub trait Branch { /// Takes one input stream and splits it into two output streams. /// For each record, the supplied closure is called with a reference to /// the data and its time. If it returns `true`, the record will be sent @@ -35,7 +35,7 @@ pub trait Branch { ) -> (Stream, Stream); } -impl Branch for Stream { +impl Branch for Stream { fn branch( self, condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, diff --git a/timely/src/dataflow/operators/broadcast.rs b/timely/src/dataflow/operators/broadcast.rs index 8d99332e7..8a1979d12 100644 --- a/timely/src/dataflow/operators/broadcast.rs +++ b/timely/src/dataflow/operators/broadcast.rs @@ -21,7 +21,7 @@ pub trait Broadcast { fn broadcast(self) -> Self; } -impl Broadcast for Stream { +impl Broadcast for Stream { fn broadcast(self) -> Stream { // NOTE: Simplified implementation due to underlying motion diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/count.rs index 2cca27c22..44df4f736 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/count.rs @@ -1,13 +1,12 @@ //! Counts the number of records at each time. use std::collections::HashMap; -use crate::Data; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; /// Accumulates records within a timestamp. -pub trait Accumulate : Sized { +pub trait Accumulate : Sized { /// Accumulates records within a timestamp. /// /// # Examples @@ -25,7 +24,7 @@ pub trait Accumulate : Sized { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![45])]); /// ``` - fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream; + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream; /// Counts the number of records observed at each time. /// /// # Examples @@ -48,8 +47,8 @@ pub trait Accumulate : Sized { } } -impl, D: Data> Accumulate for Stream { - fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream { +impl, D: 'static> Accumulate for Stream { + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream { let mut accums = HashMap::new(); self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/delay.rs index a1b2d8f85..c9151964f 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/delay.rs @@ -2,14 +2,13 @@ use std::collections::HashMap; -use crate::Data; use crate::order::{PartialOrder, TotalOrder}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; /// Methods to advance the timestamps of records or batches of records. -pub trait Delay { +pub trait Delay { /// Advances the timestamp of records using a supplied function. /// @@ -94,7 +93,7 @@ pub trait Delay { fn delay_batchG::Timestamp+'static>(self, func: L) -> Self; } -impl, D: Data> Delay for Stream { +impl, D: 'static> Delay for Stream { fn delayG::Timestamp+'static>(self, mut func: L) -> Self { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index 53ca14373..a5042b784 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -1,12 +1,11 @@ //! Filters a stream by a predicate. -use crate::Data; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. -pub trait Filter { +pub trait Filter { /// Returns a new instance of `self` containing only records satisfying `predicate`. /// /// # Examples @@ -22,7 +21,7 @@ pub trait Filter { fn filterbool+'static>(self, predicate: P) -> Self; } -impl Filter for Stream { +impl Filter for Stream { fn filterbool+'static>(self, mut predicate: P) -> Stream { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each_time(|time, data| { diff --git a/timely/src/dataflow/operators/flow_controlled.rs b/timely/src/dataflow/operators/flow_controlled.rs index c9e30d334..1f8600d89 100644 --- a/timely/src/dataflow/operators/flow_controlled.rs +++ b/timely/src/dataflow/operators/flow_controlled.rs @@ -1,6 +1,5 @@ //! Methods to construct flow-controlled sources. -use crate::Data; use crate::order::{PartialOrder, TotalOrder}; use crate::progress::timestamp::Timestamp; use crate::dataflow::operators::generic::operator::source; @@ -8,7 +7,7 @@ use crate::dataflow::operators::probe::Handle; use crate::dataflow::{Stream, Scope}; /// Output of the input reading function for iterator_source. -pub struct IteratorSourceInput, I: IntoIterator> { +pub struct IteratorSourceInput, I: IntoIterator> { /// Lower bound on timestamps that can be emitted by this input in the future. pub lower_bound: T, /// Any `T: IntoIterator` of new input data in the form (time, data): time must be @@ -20,8 +19,8 @@ pub struct IteratorSourceInput, I: I } /// Construct a source that repeatedly calls the provided function to ingest input. -/// -/// The function can return `None` to signal the end of the input. +/// +/// The function can return `None` to signal the end of the input. /// Otherwise, it should return a [`IteratorSourceInput`], where: /// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future, /// `Default::default()` can be used if this isn't needed (the source will assume that @@ -71,7 +70,7 @@ pub struct IteratorSourceInput, I: I /// ``` pub fn iterator_source< G: Scope, - D: Data, + D: 'static, DI: IntoIterator, I: IntoIterator, F: FnMut(&G::Timestamp)->Option>+'static>( diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 30d243ab4..b9aba80eb 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -1,6 +1,5 @@ //! Create new `Streams` connected to external inputs. -use crate::Data; use crate::container::CapacityContainerBuilder; use crate::dataflow::{Stream, ScopeParent, Scope}; use crate::dataflow::operators::core::{Input as InputCore}; @@ -47,7 +46,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream); + fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream); /// Create a new stream from a supplied interactive handle. /// @@ -79,16 +78,16 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; + fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream) { + fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream) { InputCore::new_input(self) } - fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream { + fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream { InputCore::input_from(self, handle) } } diff --git a/timely/src/dataflow/operators/map.rs b/timely/src/dataflow/operators/map.rs index ff461c424..abb65f517 100644 --- a/timely/src/dataflow/operators/map.rs +++ b/timely/src/dataflow/operators/map.rs @@ -1,13 +1,12 @@ //! Extension methods for `Stream` based on record-by-record transformation. -use crate::Data; use crate::dataflow::{Stream, Scope}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::operators::core::{Map as MapCore}; /// Extension trait for `Stream`. -pub trait Map : Sized { +pub trait Map : Sized { /// Consumes each element of the stream and yields a new element. /// /// # Examples @@ -20,7 +19,7 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn mapD2+'static>(self, mut logic: L) -> Stream { + fn mapD2+'static>(self, mut logic: L) -> Stream { self.flat_map(move |x| std::iter::once(logic(x))) } /// Updates each element of the stream and yields the element, re-using memory where possible. @@ -48,10 +47,10 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_mapI+'static>(self, logic: L) -> Stream where I::Item: Data; + fn flat_mapI+'static>(self, logic: L) -> Stream where I::Item: 'static; } -impl Map for Stream { +impl Map for Stream { fn map_in_place(self, mut logic: L) -> Stream { self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| { input.for_each_time(|time, data| { @@ -66,7 +65,7 @@ impl Map for Stream { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_mapI+'static>(self, logic: L) -> Stream where I::Item: Data { + fn flat_mapI+'static>(self, logic: L) -> Stream where I::Item: 'static { MapCore::flat_map(self, logic) } } diff --git a/timely/src/dataflow/operators/partition.rs b/timely/src/dataflow/operators/partition.rs index bd0acda7d..eae748645 100644 --- a/timely/src/dataflow/operators/partition.rs +++ b/timely/src/dataflow/operators/partition.rs @@ -3,10 +3,9 @@ use crate::container::CapacityContainerBuilder; use crate::dataflow::operators::core::Partition as PartitionCore; use crate::dataflow::{Scope, Stream}; -use crate::Data; /// Partition a stream of records into multiple streams. -pub trait Partition (u64, D2)> { +pub trait Partition (u64, D2)> { /// Produces `parts` output streams, containing records produced and assigned by `route`. /// /// # Examples @@ -25,7 +24,7 @@ pub trait Partition (u64, D2)> { fn partition(self, parts: u64, route: F) -> Vec>; } -impl(u64, D2)+'static> Partition for Stream { +impl(u64, D2)+'static> Partition for Stream { fn partition(self, parts: u64, route: F) -> Vec> { PartitionCore::partition::, _, _>(self, parts, route) } diff --git a/timely/src/dataflow/operators/result.rs b/timely/src/dataflow/operators/result.rs index 5a5bf6210..b57124956 100644 --- a/timely/src/dataflow/operators/result.rs +++ b/timely/src/dataflow/operators/result.rs @@ -1,11 +1,10 @@ //! Extension methods for `Stream` containing `Result`s. -use crate::Data; use crate::dataflow::operators::Map; use crate::dataflow::{Scope, Stream}; /// Extension trait for `Stream`. -pub trait ResultStream { +pub trait ResultStream { /// Returns a new instance of `self` containing only `ok` records. /// /// # Examples @@ -46,7 +45,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_ok T2 + 'static>(self, logic: L) -> Stream>; + fn map_ok T2 + 'static>(self, logic: L) -> Stream>; /// Returns a new instance of `self` applying `logic` on all `Err` records. /// @@ -60,7 +59,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_err E2 + 'static>(self, logic: L) -> Stream>; + fn map_err E2 + 'static>(self, logic: L) -> Stream>; /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err` /// records. @@ -75,7 +74,7 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn and_then Result + 'static>( + fn and_then Result + 'static>( self, logic: L, ) -> Stream>; @@ -95,7 +94,7 @@ pub trait ResultStream { fn unwrap_or_else T + 'static>(self, logic: L) -> Stream; } -impl ResultStream for Stream> { +impl ResultStream for Stream> { fn ok(self) -> Stream { self.flat_map(Result::ok) } @@ -104,15 +103,15 @@ impl ResultStream for Stream T2 + 'static>(self, mut logic: L) -> Stream> { + fn map_ok T2 + 'static>(self, mut logic: L) -> Stream> { self.map(move |r| r.map(&mut logic)) } - fn map_err E2 + 'static>(self, mut logic: L) -> Stream> { + fn map_err E2 + 'static>(self, mut logic: L) -> Stream> { self.map(move |r| r.map_err(&mut logic)) } - fn and_then Result + 'static>(self, mut logic: L) -> Stream> { + fn and_then Result + 'static>(self, mut logic: L) -> Stream> { self.map(move |r| r.and_then(&mut logic)) } diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index a85677dc0..06a2af090 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -1,11 +1,10 @@ //! Conversion to the `Stream` type from iterators. -use crate::Data; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::core::{ToStream as ToStreamCore}; /// Converts to a timely `Stream`. -pub trait ToStream { +pub trait ToStream { /// Converts to a timely `Stream`. /// /// # Examples @@ -25,7 +24,7 @@ pub trait ToStream { fn to_stream(self, scope: &mut S) -> Stream; } -impl ToStream for I where I::Item: Data { +impl ToStream for I where I::Item: 'static { fn to_stream(self, scope: &mut S) -> Stream { ToStreamCore::to_stream(self, scope) } diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index 6f6b8af48..e99e8ba39 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -1,7 +1,5 @@ //! Create new `Streams` connected to external inputs. -use crate::Data; - use crate::container::CapacityContainerBuilder; use crate::dataflow::operators::{ActivateCapability}; use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore}; @@ -62,12 +60,12 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); } impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { UnorderedInputCore::new_unordered_input(self) } } diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 065b6e9e9..fef123620 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -98,12 +98,6 @@ pub mod logging; pub mod scheduling; -/// A composite trait for types usable as data in timely dataflow. -/// -/// The `Data` trait is necessary for all types that go along timely dataflow channels. -pub trait Data: Clone+'static { } -impl Data for T { } - /// A composite trait for types usable as containers in timely dataflow. /// /// The `Container` trait is necessary for all containers in timely dataflow channels. @@ -118,8 +112,8 @@ impl + Default + 's /// /// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication` /// `Data` trait, which describes requirements for communication along channels. -pub trait ExchangeData: Data + encoding::Data { } -impl ExchangeData for T { } +pub trait ExchangeData: encoding::Data + 'static { } +impl ExchangeData for T { } #[doc = include_str!("../../README.md")] #[cfg(doctest)] diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 2634b729c..092e61240 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -52,7 +52,7 @@ pub struct Sequencer { recv: Rc>>, // sequenced items. } -impl Sequencer { +impl Sequencer { /// Creates a new Sequencer. /// From 2d9ae696053852301310a98ef197ce68748e9515 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Feb 2026 09:11:35 -0500 Subject: [PATCH 09/10] Copy loopdemo improvement --- timely/examples/loopdemo.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timely/examples/loopdemo.rs b/timely/examples/loopdemo.rs index 8875ca1c1..681d8d69e 100644 --- a/timely/examples/loopdemo.rs +++ b/timely/examples/loopdemo.rs @@ -31,8 +31,8 @@ fn main() { .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 }) .filter(|x| x > &1); - step.clone().connect_loop(loop_handle); - step.probe_with(&probe); + step.probe_with(&probe) + .connect_loop(loop_handle); }); let ns_per_request = 1_000_000_000 / rate; From 7294a037c37568cb7721d2c7674f239d43dd6833 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 21 Feb 2026 16:46:07 -0500 Subject: [PATCH 10/10] Streamlining constraints, formatting --- timely/src/dataflow/mod.rs | 2 +- .../operators/aggregation/aggregate.rs | 2 +- timely/src/dataflow/operators/branch.rs | 2 +- .../operators/core/capture/capture.rs | 2 +- timely/src/dataflow/operators/core/concat.rs | 2 +- .../src/dataflow/operators/core/enterleave.rs | 2 +- timely/src/dataflow/operators/core/inspect.rs | 4 ++-- timely/src/dataflow/operators/core/map.rs | 8 +++---- timely/src/dataflow/operators/core/rc.rs | 3 +-- .../operators/core/unordered_input.rs | 2 +- timely/src/dataflow/operators/count.rs | 4 ++-- timely/src/dataflow/operators/filter.rs | 2 +- .../dataflow/operators/generic/builder_raw.rs | 4 ++-- timely/src/dataflow/operators/to_stream.rs | 2 +- .../src/dataflow/operators/unordered_input.rs | 4 ++-- timely/src/dataflow/stream.rs | 24 +++++++++++++++++++ timely/src/lib.rs | 7 +----- timely/src/synchronization/sequence.rs | 2 +- 18 files changed, 47 insertions(+), 31 deletions(-) diff --git a/timely/src/dataflow/mod.rs b/timely/src/dataflow/mod.rs index 043317fd8..e2ac17e14 100644 --- a/timely/src/dataflow/mod.rs +++ b/timely/src/dataflow/mod.rs @@ -13,7 +13,7 @@ //! }); //! ``` -pub use self::stream::{StreamCore, Stream}; +pub use self::stream::{Stream, StreamCore}; pub use self::scopes::{Scope, ScopeParent}; pub use self::operators::core::input::Handle as InputHandleCore; diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/aggregation/aggregate.rs index 388b615ba..27bd594ae 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/aggregation/aggregate.rs @@ -67,7 +67,7 @@ pub trait Aggregate { hash: H) -> Stream where S::Timestamp: Eq; } -impl, K: ExchangeData+Hash+Eq+Clone, V: ExchangeData> Aggregate for Stream { +impl, K: ExchangeData+Clone+Hash+Eq, V: ExchangeData> Aggregate for Stream { fn aggregateR+'static, H: Fn(&K)->u64+'static>( self, diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index b03757b4c..9b618ee07 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -7,7 +7,7 @@ use crate::dataflow::{Scope, Stream, StreamCore}; use crate::Container; /// Extension trait for `Stream`. -pub trait Branch { +pub trait Branch { /// Takes one input stream and splits it into two output streams. /// For each record, the supplied closure is called with a reference to /// the data and its time. If it returns `true`, the record will be sent diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index efc48506e..b20a63005 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -125,7 +125,7 @@ impl Capture for StreamCore { if !started { // discard initial capability. - progress.frontiers[0].update(S::Timestamp::minimum(), -1); + progress.frontiers[0].update(Timestamp::minimum(), -1); started = true; } if !progress.frontiers[0].is_empty() { diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index 720dc9a9c..b94db00bf 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -19,7 +19,7 @@ pub trait Concat { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concat(self, _: StreamCore) -> StreamCore; + fn concat(self, other: StreamCore) -> StreamCore; } impl Concat for StreamCore { diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index c8f77fc16..3a1ab0b93 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -132,7 +132,7 @@ impl> Leave for struct IngressNub, TContainer: Container> { targets: Counter>, - phantom: ::std::marker::PhantomData, + phantom: PhantomData, activator: crate::scheduling::Activator, active: bool, } diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index 27ff694de..a4b235455 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -123,12 +123,12 @@ pub trait InspectCore { /// }); /// }); /// ``` - fn inspect_container(self, func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; + fn inspect_container(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } impl InspectCore for StreamCore { - fn inspect_container(self, mut func: F) -> StreamCore + fn inspect_container(self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static { use crate::progress::timestamp::Timestamp; diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 23d603e7d..57ab43b0a 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -76,11 +76,9 @@ pub trait Map : Sized { /// /// assert_eq!((4..14).collect::>(), data.extract()[0].1); /// ``` - fn flat_map_builder<'t, I, L>(self, logic: L) -> FlatMapBuilder + fn flat_map_builder(self, logic: L) -> FlatMapBuilder where - C: Clone + 'static, L: for<'a> Fn(C::Item<'a>) -> I, - Self: Sized, { FlatMapBuilder::new(self, logic) } @@ -116,7 +114,7 @@ where marker: std::marker::PhantomData, } -impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder +impl FlatMapBuilder where for<'a> F: Fn(C::Item<'a>) -> I, { @@ -125,7 +123,7 @@ where FlatMapBuilder { stream, logic, marker: std::marker::PhantomData } } - /// Transform a flatmapped stream through addiitonal logic. + /// Transform a flatmapped stream through additional logic. pub fn map I2 + 'static, I2>(self, g: G) -> FlatMapBuilder) -> I2 + 'static, I2> { let logic = self.logic; FlatMapBuilder { diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index 3e49e1fd7..11174b1a9 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -44,13 +44,12 @@ mod test { use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::capture::Extract; use crate::dataflow::operators::rc::SharedStream; - use crate::dataflow::operators::{Capture, Concatenate, InspectCore, Operator, ToStream}; + use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream}; #[test] fn test_shared() { let output = crate::example(|scope| { let shared = vec![Ok(0), Err(())].to_stream(scope).container::>().shared(); - let shared = shared.inspect_container(|x| println!("seen: {x:?}")); scope .concatenate([ shared.clone().unary(Pipeline, "read shared 1", |_, _| { diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 6d8574d52..ac4f40f97 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -11,7 +11,7 @@ use crate::progress::{Operate, operate::SharedProgress, Timestamp}; use crate::progress::Source; use crate::progress::ChangeBatch; use crate::progress::operate::Connectivity; -use crate::dataflow::channels::pushers::{Counter, Tee, Output}; +use crate::dataflow::channels::pushers::{Counter, Output, Tee}; use crate::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession}; use crate::dataflow::operators::{ActivateCapability, Capability}; use crate::dataflow::{Scope, StreamCore}; diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/count.rs index 44df4f736..349b542f6 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/count.rs @@ -24,7 +24,7 @@ pub trait Accumulate : Sized { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![45])]); /// ``` - fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream; + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream; /// Counts the number of records observed at each time. /// /// # Examples @@ -48,7 +48,7 @@ pub trait Accumulate : Sized { } impl, D: 'static> Accumulate for Stream { - fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream { + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream { let mut accums = HashMap::new(); self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/filter.rs index a5042b784..0481dac35 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/filter.rs @@ -5,7 +5,7 @@ use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. -pub trait Filter { +pub trait Filter { /// Returns a new instance of `self` containing only records satisfying `predicate`. /// /// # Examples diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index e6669a732..b23206046 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -146,7 +146,7 @@ impl OperatorBuilder { { let new_output = self.shape.outputs; self.shape.outputs += 1; - let (targets, registrar) = Tee::::new(); + let (target, registrar) = Tee::new(); let source = Source::new(self.index, new_output); let stream = StreamCore::new(source, registrar, self.scope.clone()); @@ -154,7 +154,7 @@ impl OperatorBuilder { self.summary[input].add_port(new_output, entry); } - (targets, stream) + (target, stream) } /// Creates an operator implementation from supplied logic constructor. diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 06a2af090..b7916fb1c 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -24,7 +24,7 @@ pub trait ToStream { fn to_stream(self, scope: &mut S) -> Stream; } -impl ToStream for I where I::Item: 'static { +impl ToStream for I { fn to_stream(self, scope: &mut S) -> Stream { ToStreamCore::to_stream(self, scope) } diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index e99e8ba39..afdc2253a 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -60,12 +60,12 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); } impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { UnorderedInputCore::new_unordered_input(self) } } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 47486f9c6..1544ad0b6 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -98,3 +98,27 @@ where .finish_non_exhaustive() } } + +#[cfg(test)] +mod tests { + use crate::dataflow::channels::pact::Pipeline; + use crate::dataflow::operators::{Operator, ToStream}; + + #[derive(Debug, Eq, PartialEq)] + struct NotClone; + + #[test] + fn test_non_clone_stream() { + crate::example(|scope| { + let _ = [NotClone] + .to_stream(scope) + .sink(Pipeline, "check non-clone", |(input, _frontier)| { + input.for_each(|_cap, data| { + for datum in data.drain(..) { + assert_eq!(datum, NotClone); + } + }); + }); + }); + } +} diff --git a/timely/src/lib.rs b/timely/src/lib.rs index fef123620..a4c8c1326 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -108,12 +108,7 @@ impl Container for C { } pub trait ContainerBuilder: timely_container::ContainerBuilder + Default + 'static {} impl + Default + 'static> ContainerBuilder for CB {} -/// A composite trait for types usable on exchange channels in timely dataflow. -/// -/// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication` -/// `Data` trait, which describes requirements for communication along channels. -pub trait ExchangeData: encoding::Data + 'static { } -impl ExchangeData for T { } +pub use encoding::Data as ExchangeData; #[doc = include_str!("../../README.md")] #[cfg(doctest)] diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 092e61240..c02ea2506 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -229,4 +229,4 @@ impl Drop for Sequencer { .expect("Sequencer.activator unavailable") .activate() } -} \ No newline at end of file +}