Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
8 changes: 0 additions & 8 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,4 @@ impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
&self.produced
}
/// Ships a time and a container.
///
/// This is not a validated capability, and this method should not be used without great care.
/// Ideally, users would not have direct access to a `Counter`, and preventing this is the way
/// to uphold invariants.
#[inline] pub fn give<C: crate::Container>(&mut self, time: T, container: &mut C) where P: Push<Message<T, C>> {
if !container.is_empty() { Message::push_at(container, time, self); }
}
}
5 changes: 3 additions & 2 deletions timely/src/dataflow/operators/core/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::progress::Timestamp;
use super::Event;
use super::event::EventIterator;
use crate::Container;
use crate::dataflow::channels::Message;

/// Replay a capture stream into a scope with the same timestamp.
pub trait Replay<T: Timestamp, C> : Sized {
Expand Down Expand Up @@ -99,14 +100,14 @@ where
progress.internals[0].extend(vec.into_iter());
},
Owned(Event::Messages(time, mut data)) => {
output.give(time.clone(), &mut data);
Message::push_at(&mut data, time, &mut output);
}
Borrowed(Event::Progress(vec)) => {
progress.internals[0].extend(vec.iter().cloned());
},
Borrowed(Event::Messages(time, data)) => {
allocation.clone_from(data);
output.give(time.clone(), &mut allocation);
Message::push_at(&mut allocation, time.clone(), &mut output);
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions timely/src/dataflow/operators/core/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;

use crate::dataflow::{StreamCore, Scope};
use crate::Container;
use crate::dataflow::channels::Message;

/// Monitors progress at a `Stream`.
pub trait Probe<G: Scope, C: Container> {
Expand Down Expand Up @@ -112,9 +113,7 @@ impl<G: Scope, C: Container> Probe<G, C> for StreamCore<G, C> {
}

while let Some(message) = input.next() {
let time = &message.time;
let data = &mut message.data;
output.give(time.clone(), data);
Message::push_at(&mut message.data, message.time.clone(), &mut output);
}
use timely_communication::Push;
output.done();
Expand Down
Loading