From 71160cb92787ed3abf02bad66add9b81c6d800e7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 28 Oct 2025 14:46:27 +0100 Subject: [PATCH] Remove push counter's give Remove the `give` function from `pushers::counters::Counter` as it is a thin wrapper around `Message::push` and caused confusing bugs in the past. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/channels/pushers/buffer.rs | 0 timely/src/dataflow/channels/pushers/counter.rs | 8 -------- timely/src/dataflow/operators/core/capture/replay.rs | 5 +++-- timely/src/dataflow/operators/core/probe.rs | 5 ++--- 4 files changed, 5 insertions(+), 13 deletions(-) delete mode 100644 timely/src/dataflow/channels/pushers/buffer.rs diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs deleted file mode 100644 index e69de29bb..000000000 diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 4443638ed..ddbe7b761 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -42,12 +42,4 @@ impl Counter where T : Ord+Clone+'static { pub fn produced(&self) -> &Rc>> { &self.produced } - /// Ships a time and a container. - /// - /// This is not a validated capability, and this method should not be used without great care. - /// Ideally, users would not have direct access to a `Counter`, and preventing this is the way - /// to uphold invariants. - #[inline] pub fn give(&mut self, time: T, container: &mut C) where P: Push> { - if !container.is_empty() { Message::push_at(container, time, self); } - } } diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index cbb120e1a..70a4d9c1b 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -46,6 +46,7 @@ use crate::progress::Timestamp; use super::Event; use super::event::EventIterator; use crate::Container; +use crate::dataflow::channels::Message; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { @@ -99,14 +100,14 @@ where progress.internals[0].extend(vec.into_iter()); }, Owned(Event::Messages(time, mut data)) => { - output.give(time.clone(), &mut data); + Message::push_at(&mut data, time, &mut output); } Borrowed(Event::Progress(vec)) => { progress.internals[0].extend(vec.iter().cloned()); }, Borrowed(Event::Messages(time, data)) => { allocation.clone_from(data); - output.give(time.clone(), &mut allocation); + Message::push_at(&mut allocation, time.clone(), &mut output); } } } diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index c6af04cdb..5b8821bf8 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -13,6 +13,7 @@ use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::dataflow::{StreamCore, Scope}; use crate::Container; +use crate::dataflow::channels::Message; /// Monitors progress at a `Stream`. pub trait Probe { @@ -112,9 +113,7 @@ impl Probe for StreamCore { } while let Some(message) = input.next() { - let time = &message.time; - let data = &mut message.data; - output.give(time.clone(), data); + Message::push_at(&mut message.data, message.time.clone(), &mut output); } use timely_communication::Push; output.done();