From d9c32a77df544ecd578157150a8ab4eaeb2893e8 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 27 Oct 2025 20:06:05 -0400 Subject: [PATCH] Record progress updates for direct container sends --- timely/src/dataflow/channels/pushers/counter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 249569937..4443638ed 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -3,7 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::progress::{ChangeBatch, Timestamp}; +use crate::progress::ChangeBatch; use crate::dataflow::channels::Message; use crate::communication::Push; use crate::Accountable; @@ -15,7 +15,7 @@ pub struct Counter { produced: Rc>>, } -impl Push> for Counter where P: Push> { +impl Push> for Counter where P: Push> { #[inline] fn push(&mut self, message: &mut Option>) { if let Some(message) = message { @@ -48,6 +48,6 @@ impl Counter where T : Ord+Clone+'static { /// Ideally, users would not have direct access to a `Counter`, and preventing this is the way /// to uphold invariants. #[inline] pub fn give(&mut self, time: T, container: &mut C) where P: Push> { - if !container.is_empty() { Message::push_at(container, time, &mut self.pushee); } + if !container.is_empty() { Message::push_at(container, time, self); } } }