From 503ca852dcda42c5ede0f5eaa5e65aa52c14c6d5 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 21 Dec 2022 14:20:19 +0100 Subject: [PATCH] dataflow: Account for probe handles being dropped The lifetime of a probe handle is not attached to any particular dataflow and users are free to connect them to streams of multiple dataflows. This PR handles the situation in which a probe handle is connected to two separate dataflows and one of them drops. Previously the handle would have its frontier stuck because the old handle wouldn't provide any updates anymore. The solution is that each handle keeps track of its own changes and removes them from the overall calculation on drop. Signed-off-by: Petros Angelatos --- timely/src/dataflow/operators/probe.rs | 39 ++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567e..51dd5039d 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -4,6 +4,7 @@ use std::rc::Rc; use std::cell::RefCell; use crate::progress::Timestamp; +use crate::order::PartialOrder; use crate::progress::frontier::{AntichainRef, MutableAntichain}; use crate::dataflow::channels::pushers::CounterCore as PushCounter; use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; @@ -94,7 +95,7 @@ impl Probe for StreamCore { let (tee, stream) = builder.new_output(); let mut output = PushBuffer::new(PushCounter::new(tee)); - let shared_frontier = handle.frontier.clone(); + let mut handle = handle.clone(); let mut started = false; let mut vector = Default::default(); @@ -103,8 +104,7 @@ impl Probe for StreamCore { move |progress| { // surface all frontier changes to the shared frontier. - let mut borrow = shared_frontier.borrow_mut(); - borrow.update_iter(progress.frontiers[0].drain()); + handle.update_iter(progress.frontiers[0].drain()); if !started { // discard initial capability. @@ -139,7 +139,10 @@ impl Probe for StreamCore { /// Reports information about progress at the probe. #[derive(Debug)] pub struct Handle { - frontier: Rc>> + /// The overall shared frontier managed by all the handles + frontier: Rc>>, + /// The private frontier containing the changes produced by this handle only + handle_frontier: MutableAntichain, } impl Handle { @@ -150,7 +153,21 @@ impl Handle { /// returns true iff the frontier is empty. #[inline] pub fn done(&self) -> bool { self.frontier.borrow().is_empty() } /// Allocates a new handle. - #[inline] pub fn new() -> Self { Handle { frontier: Rc::new(RefCell::new(MutableAntichain::new())) } } + #[inline] pub fn new() -> Self { + Handle { + frontier: Rc::new(RefCell::new(MutableAntichain::new())), + handle_frontier: MutableAntichain::new() + } + } + + #[inline] + fn update_iter(&mut self, updates: I) + where + T: Clone + PartialOrder + Ord, + I: IntoIterator, + { + self.frontier.borrow_mut().update_iter(self.handle_frontier.update_iter(updates)); + } /// Invokes a method on the frontier, returning its result. /// @@ -171,10 +188,20 @@ impl Handle { } } +impl Drop for Handle { + fn drop(&mut self) { + // This handle is being dropped so remove it from the overall calculation + self.frontier.borrow_mut().update_iter( + self.handle_frontier.frontier().iter().map(|t| (t.clone(), -1)) + ); + } +} + impl Clone for Handle { fn clone(&self) -> Self { Handle { - frontier: self.frontier.clone() + frontier: self.frontier.clone(), + handle_frontier: MutableAntichain::new(), } } }