From 863d44783107ede34ebfaddd96b8c653033d2be2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 21 Sep 2025 11:59:27 -0400 Subject: [PATCH 1/3] Updates for timely 0.25 --- Cargo.toml | 4 +-- .../examples/iterate_container.rs | 3 +- .../examples/multitemporal.rs | 3 +- differential-dataflow/examples/progress.rs | 2 +- .../src/algorithms/graphs/propagate.rs | 6 ++-- .../src/algorithms/graphs/scc.rs | 4 +-- .../src/algorithms/graphs/sequential.rs | 2 +- differential-dataflow/src/capture.rs | 30 +++++++++++------- differential-dataflow/src/collection.rs | 1 + differential-dataflow/src/containers.rs | 7 ----- differential-dataflow/src/dynamic/mod.rs | 5 +-- .../src/operators/arrange/arrangement.rs | 18 +++++------ .../src/operators/arrange/upsert.rs | 16 +++++----- differential-dataflow/src/operators/count.rs | 2 +- differential-dataflow/src/operators/join.rs | 29 ++++++++--------- differential-dataflow/src/operators/reduce.rs | 2 +- .../src/operators/threshold.rs | 2 +- dogsdogsdogs/examples/delta_query2.rs | 10 +++--- dogsdogsdogs/src/lib.rs | 2 +- dogsdogsdogs/src/operators/count.rs | 2 +- dogsdogsdogs/src/operators/half_join.rs | 31 ++++++++----------- dogsdogsdogs/src/operators/lookup_map.rs | 11 ++++--- dogsdogsdogs/src/operators/propose.rs | 2 ++ dogsdogsdogs/src/operators/validate.rs | 1 + experiments/src/bin/graphs-interactive-alt.rs | 2 +- experiments/src/bin/multitemporal.rs | 21 ++++++++----- interactive/src/logging.rs | 28 +++++++++++------ 27 files changed, 132 insertions(+), 114 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e42d7aa08..4cdbfb214 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,9 @@ resolver = "2" [workspace.dependencies] differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.17.0" } -timely = { version = "0.24", default-features = false } +#timely = { version = "0.24", default-features = false } columnar = { version = "0.10", default-features = false } -#timely = { path = "../timely-dataflow/timely/", default-features = false } +timely = { path = "../timely-dataflow/timely/", default-features = false } [profile.release] opt-level = 3 diff --git a/differential-dataflow/examples/iterate_container.rs b/differential-dataflow/examples/iterate_container.rs index 9c941a8cc..72c4b5fcb 100644 --- a/differential-dataflow/examples/iterate_container.rs +++ b/differential-dataflow/examples/iterate_container.rs @@ -40,8 +40,7 @@ fn wrap(stream: &StreamCore) -> StreamCore builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); input.for_each(|time, data| { - let mut session = output.session(&time); - session.give_container(&mut ContainerWrapper(std::mem::take(data))); + output.give(&time, &mut ContainerWrapper(std::mem::take(data))); }); }); stream_out diff --git a/differential-dataflow/examples/multitemporal.rs b/differential-dataflow/examples/multitemporal.rs index 905960318..36acbd5e7 100644 --- a/differential-dataflow/examples/multitemporal.rs +++ b/differential-dataflow/examples/multitemporal.rs @@ -59,7 +59,8 @@ fn main() { let time = Pair::new(arguments[1], arguments[2]); if capability.time().less_equal(&time) { input - .session(capability.clone()) + .activate() + .session(&capability) .give((arguments[0], time, arguments[3])); } else { println!("Requested time {:?} no longer open (input from {:?})", time, capability.time()); diff --git a/differential-dataflow/examples/progress.rs b/differential-dataflow/examples/progress.rs index 8be8cb7a4..9be35a9cc 100644 --- a/differential-dataflow/examples/progress.rs +++ b/differential-dataflow/examples/progress.rs @@ -121,7 +121,7 @@ fn frontier( ) -> VecCollection where G: Scope, - T: Timestamp, + T: Timestamp+std::hash::Hash, { // Translate node and edge transitions into a common Location to Location edge with an associated Summary. let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary))); diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 4d569a151..2c0671b62 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -16,7 +16,7 @@ use crate::operators::arrange::arrangement::ArrangeByKey; /// method to limit the introduction of labels. pub fn propagate(edges: &VecCollection, nodes: &VecCollection) -> VecCollection where - G: Scope, + G: Scope, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, @@ -33,7 +33,7 @@ where /// method to limit the introduction of labels. pub fn propagate_at(edges: &VecCollection, nodes: &VecCollection, logic: F) -> VecCollection where - G: Scope, + G: Scope, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, @@ -60,7 +60,7 @@ where R: Multiply, R: From, L: ExchangeData, - Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time:Hash, Diff=R>+Clone+'static, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index 7d570b8c9..c3ad20a81 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -35,7 +35,7 @@ where /// Returns the subset of edges in the same strongly connected component. pub fn strongly_connected(graph: &VecCollection) -> VecCollection where - G: Scope, + G: Scope, N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, @@ -51,7 +51,7 @@ where fn trim_edges(cycle: &VecCollection, edges: &VecCollection) -> VecCollection where - G: Scope, + G: Scope, N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, diff --git a/differential-dataflow/src/algorithms/graphs/sequential.rs b/differential-dataflow/src/algorithms/graphs/sequential.rs index 598fab128..3f4bbcf29 100644 --- a/differential-dataflow/src/algorithms/graphs/sequential.rs +++ b/differential-dataflow/src/algorithms/graphs/sequential.rs @@ -11,7 +11,7 @@ use crate::hashable::Hashable; fn _color(edges: &VecCollection) -> VecCollection)> where - G: Scope, + G: Scope, N: ExchangeData+Hash, { // need some bogus initial values. diff --git a/differential-dataflow/src/capture.rs b/differential-dataflow/src/capture.rs index 049441950..dbcc653b3 100644 --- a/differential-dataflow/src/capture.rs +++ b/differential-dataflow/src/capture.rs @@ -228,6 +228,7 @@ pub mod source { use std::marker::{Send, Sync}; use std::sync::Arc; use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}}; + use timely::dataflow::operators::generic::OutputBuilder; use timely::progress::Timestamp; use timely::scheduling::SyncActivator; @@ -313,8 +314,11 @@ pub mod source { let activator2 = scope.activator_for(Rc::clone(&address)); let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) }; let mut source = source_builder(activator); - let (mut updates_out, updates) = messages_op.new_output(); - let (mut progress_out, progress) = messages_op.new_output(); + let (updates_out, updates) = messages_op.new_output(); + let mut updates_out = OutputBuilder::from(updates_out); + let (progress_out, progress) = messages_op.new_output(); + let mut progress_out = OutputBuilder::from(progress_out); + messages_op.build(|capabilities| { // A Weak that communicates whether the returned token has been dropped. @@ -387,8 +391,11 @@ pub mod source { // Step 2: The UPDATES operator. let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone()); let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed())); - let (mut changes_out, changes) = updates_op.new_output(); - let (mut counts_out, counts) = updates_op.new_output(); + let (changes_out, changes) = updates_op.new_output(); + let mut changes_out = OutputBuilder::from(changes_out); + let (counts_out, counts) = updates_op.new_output(); + let mut counts_out = OutputBuilder::from(counts_out); + updates_op.build(move |_capability| { // Deduplicates updates, and ships novel updates and the counts for each time. // For simplicity, this operator ships updates as they are discovered to be new. @@ -438,7 +445,8 @@ pub mod source { ); let mut counts = progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed())); - let (mut frontier_out, frontier) = progress_op.new_output(); + let (frontier_out, frontier) = progress_op.new_output(); + let mut frontier_out = OutputBuilder::from(frontier_out); progress_op.build(move |_capability| { // Receive progress statements, deduplicated counts. Track lower frontier of both and broadcast changes. @@ -554,7 +562,7 @@ pub mod sink { use timely::progress::{Antichain, ChangeBatch, Timestamp}; use timely::dataflow::{Scope, Stream}; use timely::dataflow::channels::pact::{Exchange, Pipeline}; - use timely::dataflow::operators::generic::{FrontieredInputHandle, builder_rc::OperatorBuilder}; + use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder}; use crate::{lattice::Lattice, ExchangeData}; use super::{Writer, Message, Progress}; @@ -583,7 +591,8 @@ pub mod sink { let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope()); let reactivator = stream.scope().activator_for(builder.operator_info().address); let mut input = builder.new_input(stream, Pipeline); - let (mut updates_out, updates) = builder.new_output(); + let (updates_out, updates) = builder.new_output(); + let mut updates_out = OutputBuilder::from(updates_out); builder.build_reschedule( move |_capability| { @@ -650,7 +659,6 @@ pub mod sink { builder.build_reschedule(|_capabilities| { move |frontiers| { - let mut input = FrontieredInputHandle::new(&mut input, &frontiers[0]); // We want to drain inputs no matter what. // We could do this after the next step, as we are certain these timestamps will @@ -667,9 +675,9 @@ pub mod sink { // If our frontier advances strictly, we have the opportunity to issue a progress statement. if <_ as PartialOrder>::less_than( &frontier.borrow(), - &input.frontier.frontier(), + &frontiers[0].frontier(), ) { - let new_frontier = input.frontier.frontier(); + let new_frontier = frontiers[0].frontier(); // Extract the timestamp counts to announce. let mut announce = Vec::new(); @@ -691,7 +699,7 @@ pub mod sink { send_queue.push_back(Message::Progress(progress)); // Advance our frontier to track our progress utterance. - frontier = input.frontier.frontier().to_owned(); + frontier = frontiers[0].frontier().to_owned(); while let Some(message) = send_queue.front() { if let Some(duration) = sink.poll(message) { diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 2694e39a9..dba96a62b 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -496,6 +496,7 @@ impl VecCollection { /// to all of the data timestamps). pub fn delay(&self, func: F) -> VecCollection where + G::Timestamp: Hash, F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static, { let mut func1 = func.clone(); diff --git a/differential-dataflow/src/containers.rs b/differential-dataflow/src/containers.rs index b8312a0ed..4d3f684e5 100644 --- a/differential-dataflow/src/containers.rs +++ b/differential-dataflow/src/containers.rs @@ -268,8 +268,6 @@ impl PushInto<&&T> for TimelyStack { } mod container { - use std::ops::Deref; - use columnation::Columnation; use crate::containers::TimelyStack; @@ -278,11 +276,6 @@ mod container { #[inline] fn record_count(&self) -> i64 { i64::try_from(self.local.len()).unwrap() } #[inline] fn is_empty(&self) -> bool { self.local.is_empty() } } - impl timely::container::IterContainer for TimelyStack { - type ItemRef<'a> = &'a T where Self: 'a; - type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a; - #[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() } - } impl timely::container::DrainContainer for TimelyStack { type Item<'a> = &'a T where Self: 'a; type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a; diff --git a/differential-dataflow/src/dynamic/mod.rs b/differential-dataflow/src/dynamic/mod.rs index 95cbc77a5..f77c8c5e4 100644 --- a/differential-dataflow/src/dynamic/mod.rs +++ b/differential-dataflow/src/dynamic/mod.rs @@ -16,7 +16,7 @@ pub mod pointstamp; use timely::dataflow::Scope; use timely::order::Product; use timely::progress::Timestamp; -use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use timely::dataflow::operators::generic::{OutputBuilder, builder_rc::OperatorBuilder}; use timely::dataflow::channels::pact::Pipeline; use timely::progress::Antichain; @@ -42,7 +42,8 @@ where pub fn leave_dynamic(&self, level: usize) -> Self { // Create a unary operator that will strip all but `level-1` timestamp coordinates. let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope()); - let (mut output, stream) = builder.new_output(); + let (output, stream) = builder.new_output(); + let mut output = OutputBuilder::from(output); let mut input = builder.new_input_connection(&self.inner, Pipeline, [(0, Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } }))]); builder.build(move |_capability| move |_frontier| { diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index e04d6a0f0..26dbb1e07 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -405,7 +405,7 @@ where // Initialize to the minimal input frontier. let mut prev_frontier = Antichain::from_elem(::minimum()); - move |input, output| { + move |(input, frontier), output| { // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. // We don't have to keep all capabilities, but we need to be able to form output messages @@ -422,12 +422,12 @@ where // and sending smaller bites than we might have otherwise done. // Assert that the frontier never regresses. - assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier())); + assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier())); // Test to see if strict progress has occurred, which happens whenever the new // frontier isn't equal to the previous. It is only in this case that we have any // data processing to do. - if prev_frontier.borrow() != input.frontier().frontier() { + if prev_frontier.borrow() != frontier.frontier() { // There are two cases to handle with some care: // // 1. If any held capabilities are not in advance of the new input frontier, @@ -441,20 +441,20 @@ where // and feed this to the trace agent (but not along the timely output). // If there is at least one capability not in advance of the input frontier ... - if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) { let mut upper = Antichain::new(); // re-used allocation for sealing batches. // For each capability not in advance of the input frontier ... for (index, capability) in capabilities.elements().iter().enumerate() { - if !input.frontier().less_equal(capability.time()) { + if !frontier.less_equal(capability.time()) { // Assemble the upper bound on times we can commit with this capabilities. // We must respect the input frontier, and *subsequent* capabilities, as // we are pretending to retire the capability changes one by one. upper.clear(); - for time in input.frontier().frontier().iter() { + for time in frontier.frontier().iter() { upper.insert(time.clone()); } for other_capability in &capabilities.elements()[(index + 1) .. ] { @@ -490,12 +490,12 @@ where } else { // Announce progress updates, even without data. - let _batch = batcher.seal::(input.frontier().frontier().to_owned()); - writer.seal(input.frontier().frontier().to_owned()); + let _batch = batcher.seal::(frontier.frontier().to_owned()); + writer.seal(frontier.frontier().to_owned()); } prev_frontier.clear(); - prev_frontier.extend(input.frontier().frontier().iter().cloned()); + prev_frontier.extend(frontier.frontier().iter().cloned()); } writer.exert(); diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index e5c17e91e..a2aaed772 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -176,7 +176,7 @@ where let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); - move |input, output| { + move |(input, frontier), output| { // Stash capabilities and associated data (ordered by time). input.for_each(|cap, data| { @@ -187,28 +187,28 @@ where }); // Assert that the frontier never regresses. - assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier())); + assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier())); // Test to see if strict progress has occurred, which happens whenever the new // frontier isn't equal to the previous. It is only in this case that we have any // data processing to do. - if prev_frontier.borrow() != input.frontier().frontier() { + if prev_frontier.borrow() != frontier.frontier() { // If there is at least one capability not in advance of the input frontier ... - if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) { let mut upper = Antichain::new(); // re-used allocation for sealing batches. // For each capability not in advance of the input frontier ... for (index, capability) in capabilities.elements().iter().enumerate() { - if !input.frontier().less_equal(capability.time()) { + if !frontier.less_equal(capability.time()) { // Assemble the upper bound on times we can commit with this capabilities. // We must respect the input frontier, and *subsequent* capabilities, as // we are pretending to retire the capability changes one by one. upper.clear(); - for time in input.frontier().frontier().iter() { + for time in frontier.frontier().iter() { upper.insert(time.clone()); } for other_capability in &capabilities.elements()[(index + 1) .. ] { @@ -305,12 +305,12 @@ where } else { // Announce progress updates, even without data. - writer.seal(input.frontier().frontier().to_owned()); + writer.seal(frontier.frontier().to_owned()); } // Update our view of the input frontier. prev_frontier.clear(); - prev_frontier.extend(input.frontier().frontier().iter().cloned()); + prev_frontier.extend(frontier.frontier().iter().cloned()); // Downgrade capabilities for `reader_local`. reader_local.set_logical_compaction(prev_frontier.borrow()); diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index a554f13b5..2fcf42050 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -73,7 +73,7 @@ where let mut lower_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); let mut upper_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); - move |input, output| { + move |(input, _frontier), output| { let mut batch_cursors = Vec::new(); let mut batch_storage = Vec::new(); diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 0a72976c9..a07098d0e 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -5,17 +5,14 @@ //! + (b * c), and if this is not equal to the former term, little is known about the actual output. use std::cmp::Ordering; -use timely::Accountable; -use timely::container::{ContainerBuilder, PushInto}; +use timely::{Accountable, ContainerBuilder}; +use timely::container::PushInto; use timely::order::PartialOrder; use timely::progress::Timestamp; use timely::dataflow::{Scope, StreamCore}; -use timely::dataflow::operators::generic::{Operator, OutputHandleCore}; +use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Session}; use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::channels::pushers::buffer::Session; -use timely::dataflow::channels::pushers::Counter; use timely::dataflow::operators::Capability; -use timely::dataflow::channels::pushers::tee::Tee; use crate::hashable::Hashable; use crate::{Data, ExchangeData, VecCollection}; @@ -312,13 +309,13 @@ where } /// The session passed to join closures. -pub type JoinSession<'a, T, CB, C> = Session<'a, T, EffortBuilder, Counter>>; +pub type JoinSession<'a, 'b, T, CB, CT> = Session<'a, 'b, T, EffortBuilder, CT>; /// A container builder that tracks the length of outputs to estimate the effort of join closures. #[derive(Default, Debug)] pub struct EffortBuilder(pub std::cell::Cell, pub CB); -impl ContainerBuilder for EffortBuilder { +impl timely::container::ContainerBuilder for EffortBuilder { type Container = CB::Container; #[inline] @@ -361,7 +358,7 @@ where G: Scope, T1: TraceReader+Clone+'static, T2: for<'a> TraceReader=T1::Key<'a>, Time=T1::Time>+Clone+'static, - L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff,&mut JoinSession)+'static, + L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff,&mut JoinSession>)+'static, CB: ContainerBuilder, { // Rename traces for symmetry from here on out. @@ -436,7 +433,7 @@ where let mut trace1_option = Some(trace1); let mut trace2_option = Some(trace2); - move |input1, input2, output| { + move |(input1, frontier1), (input2, frontier2), output| { // 1. Consuming input. // @@ -561,12 +558,12 @@ where // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs. if let Some(trace1) = trace1_option.as_mut() { - if input2.frontier().is_empty() { trace1_option = None; } + if frontier2.is_empty() { trace1_option = None; } else { // Allow `trace1` to compact logically up to the frontier we may yet receive, // in the opposing input (`input2`). All `input2` times will be beyond this // frontier, and joined times only need to be accurate when advanced to it. - trace1.set_logical_compaction(input2.frontier().frontier()); + trace1.set_logical_compaction(frontier2.frontier()); // Allow `trace1` to compact physically up to the upper bound of batches we // have received in its input (`input1`). We will not require a cursor that // is not beyond this bound. @@ -576,12 +573,12 @@ where // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs. if let Some(trace2) = trace2_option.as_mut() { - if input1.frontier().is_empty() { trace2_option = None;} + if frontier1.is_empty() { trace2_option = None;} else { // Allow `trace2` to compact logically up to the frontier we may yet receive, // in the opposing input (`input1`). All `input1` times will be beyond this // frontier, and joined times only need to be accurate when advanced to it. - trace2.set_logical_compaction(input1.frontier().frontier()); + trace2.set_logical_compaction(frontier1.frontier()); // Allow `trace2` to compact physically up to the upper bound of batches we // have received in its input (`input2`). We will not require a cursor that // is not beyond this bound. @@ -635,9 +632,9 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandleCore, Tee>, mut logic: L, fuel: &mut usize) + fn work(&mut self, output: &mut OutputBuilderSession>, mut logic: L, fuel: &mut usize) where - L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession), + L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession>), { let meet = self.capability.time(); diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 1079e14d8..59e49c08a 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -365,7 +365,7 @@ where let id = trace.stream.scope().index(); - move |input, output| { + move |(input, _frontier), output| { // The `reduce` operator receives fully formed batches, which each serve as an indication // that the frontier has advanced to the upper bound of their description. diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index 74a73bba5..91a4039b7 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -123,7 +123,7 @@ where let mut lower_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); let mut upper_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); - move |input, output| { + move |(input, _frontier), output| { let mut batch_cursors = Vec::new(); let mut batch_storage = Vec::new(); diff --git a/dogsdogsdogs/examples/delta_query2.rs b/dogsdogsdogs/examples/delta_query2.rs index f6a225968..4b1619b5e 100644 --- a/dogsdogsdogs/examples/delta_query2.rs +++ b/dogsdogsdogs/examples/delta_query2.rs @@ -36,8 +36,8 @@ fn main() { use differential_dogs3::operators::half_join; // pick a frontier that will not mislead TOTAL ORDER comparisons. - let closure = |time: &Product, antichain: &mut timely::progress::Antichain>| { - antichain.insert(Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1))); + let closure = |time: &Product, antichain: &mut timely::progress::Antichain>| { + antichain.insert(Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1))); }; let path1 = @@ -70,11 +70,13 @@ fn main() { }); i1 - .session(c1.clone()) + .activate() + .session(&c1) .give(((5, 6), Product::new(0, 13), 1)); i2 - .session(c2.clone()) + .activate() + .session(&c2) .give(((5, 7), Product::new(11, 0), 1)); }).unwrap(); diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index 4984d8f2a..e202294f4 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -181,7 +181,7 @@ where impl PrefixExtender for CollectionExtender where - G: Scope, + G: Scope, K: ExchangeData+Hash+Default, V: ExchangeData+Hash+Default, P: ExchangeData, diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index 2772e241b..ae0e87409 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -19,7 +19,7 @@ pub fn count( ) -> VecCollection where G: Scope, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, for<'a> Tr::Diff : Semigroup>, K: Hashable + Ord + Default + 'static, R: Monoid+Multiply+ExchangeData, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index adeb0ca4f..a864a5517 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -35,12 +35,11 @@ use std::collections::HashMap; use std::ops::Mul; use std::time::Instant; -use timely::container::{CapacityContainerBuilder, ContainerBuilder}; +use timely::ContainerBuilder; +use timely::container::CapacityContainerBuilder; use timely::dataflow::{Scope, ScopeParent, StreamCore}; use timely::dataflow::channels::pact::{Pipeline, Exchange}; -use timely::dataflow::channels::pushers::buffer::Session; -use timely::dataflow::channels::pushers::{Counter as PushCounter, Tee}; -use timely::dataflow::operators::Operator; +use timely::dataflow::operators::{Capability, Operator, generic::Session}; use timely::progress::Antichain; use timely::progress::frontier::AntichainRef; @@ -86,7 +85,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: TraceReader+Clone+'static, + Tr: TraceReader+Clone+'static, R: Mul, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, @@ -107,15 +106,11 @@ where /// A session with lifetime `'a` in a scope `G` with a container builder `CB`. /// /// This is a shorthand primarily for the reson of readability. -type SessionFor<'a, G, CB> = - Session<'a, +type SessionFor<'a, 'b, G, CB> = + Session<'a, 'b, ::Timestamp, CB, - PushCounter< - ::Timestamp, - ::Container, - Tee<::Timestamp, ::Container> - > + Capability<::Timestamp>, >; /// An unsafe variant of `half_join` where the `output_func` closure takes @@ -156,7 +151,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader+Clone+'static, + Tr: for<'a> TraceReader+Clone+'static, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, @@ -180,7 +175,7 @@ where // Acquire an activator to reschedule the operator when it has unfinished work. let activator = stream.scope().activator_for(info.address); - move |input1, input2, output| { + move |(input1, frontier1), (input2, frontier2), output| { // drain the first input, stashing requests. input1.for_each(|capability, data| { @@ -211,9 +206,9 @@ where // Avoid computation if we should already yield. // TODO: Verify this is correct for TOTAL ORDER. yielded = yielded || yield_function(timer, work); - if !yielded && !input2.frontier.less_equal(capability.time()) { + if !yielded && !frontier2.less_equal(capability.time()) { - let frontier = input2.frontier.frontier(); + let frontier = frontier2.frontier(); // Update yielded: We can only go from false to {false, true} as // we're checking that `!yielded` holds before entering this block. @@ -278,7 +273,7 @@ where // The logical merging frontier depends on both input1 and stash. let mut frontier = timely::progress::frontier::Antichain::new(); - for time in input1.frontier().frontier().iter() { + for time in frontier1.frontier().iter() { frontier_func(time, &mut frontier); } for time in stash.keys() { @@ -286,7 +281,7 @@ where } arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); - if input1.frontier().is_empty() && stash.is_empty() { + if frontier1.is_empty() && stash.is_empty() { arrangement_trace = None; } } diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index c2bf040cd..da02e2bd1 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -30,6 +30,7 @@ where G: Scope, Tr: for<'a> TraceReader< KeyOwn = K, + Time: std::hash::Hash, Diff : Semigroup>+Monoid+ExchangeData, >+Clone+'static, K: Hashable + Ord + 'static, @@ -58,7 +59,7 @@ where let mut key1: K = supplied_key1; let mut key2: K = supplied_key2; - prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| { + prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |(input1, frontier1), (input2, frontier2), output| { // drain the first input, stashing requests. input1.for_each(|capability, data| { @@ -77,7 +78,7 @@ where // defer requests at incomplete times. // NOTE: not all updates may be at complete times, but if this test fails then none of them are. - if !input2.frontier.less_equal(capability.time()) { + if !frontier2.less_equal(capability.time()) { let mut session = output.session(capability); @@ -92,7 +93,7 @@ where // Key container to stage keys for comparison. let mut key_con = Tr::KeyContainer::with_capacity(1); for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { - if !input2.frontier.less_equal(time) { + if !frontier2.less_equal(time) { logic2(prefix, &mut key1); key_con.clear(); key_con.push_own(&key1); cursor.seek_key(&storage, key_con.index(1)); @@ -127,7 +128,7 @@ where // The logical merging frontier depends on both input1 and stash. let mut frontier = timely::progress::frontier::Antichain::new(); - for time in input1.frontier().frontier().to_vec() { + for time in frontier1.frontier().to_vec() { frontier.insert(time); } for key in stash.keys() { @@ -135,7 +136,7 @@ where } propose_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); - if input1.frontier().is_empty() && stash.is_empty() { + if frontier1.is_empty() && stash.is_empty() { propose_trace = None; } diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index feeb2eb44..2606285c8 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -23,6 +23,7 @@ where Tr: for<'a> TraceReader< KeyOwn = K, ValOwn = V, + Time: std::hash::Hash, Diff: Monoid+Multiply+ExchangeData+Semigroup>, >+Clone+'static, K: Hashable + Default + Ord + 'static, @@ -56,6 +57,7 @@ where Tr: for<'a> TraceReader< KeyOwn = K, ValOwn = V, + Time: std::hash::Hash, Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, K: Hashable + Default + Ord + 'static, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 3fd3b3b69..ccb0b34c0 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -21,6 +21,7 @@ where G: Scope, Tr: for<'a> TraceReader< KeyOwn = (K, V), + Time: std::hash::Hash, Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, K: Ord+Hash+Clone+Default + 'static, diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index c9bfe590a..999c64e18 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -364,7 +364,7 @@ where G::Timestamp: Lattice+Ord { fn connected_components(graph: &Arrange) -> VecCollection -where G::Timestamp: Lattice { +where G::Timestamp: Lattice + std::hash::Hash { // each edge (x,y) means that we need at least a label for the min of x and y. let nodes = diff --git a/experiments/src/bin/multitemporal.rs b/experiments/src/bin/multitemporal.rs index 3a98cab43..e61ef4c5b 100644 --- a/experiments/src/bin/multitemporal.rs +++ b/experiments/src/bin/multitemporal.rs @@ -65,12 +65,14 @@ fn main() { // load initial root. root_input - .session(root_cap) + .activate() + .session(&root_cap) .give((0, Pair::new(0, 0), 1)); // load initial edges edge_input - .session(edge_cap.clone()) + .activate() + .session(&edge_cap) .give_iterator((0 .. worker_edges).map(|_| ((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), Pair::new(0, 0), 1) @@ -90,7 +92,8 @@ fn main() { for round in 1 .. rounds { edge_input - .session(edge_cap.clone()) + .activate() + .session(&edge_cap) .give_iterator((0 .. worker_batch).flat_map(|_| { let insert = ((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), Pair::new(0, round), 1); let remove = ((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)), Pair::new(0, round),-1); @@ -112,7 +115,8 @@ fn main() { edge_cap = edge_cap_next; edge_input - .session(edge_cap.clone()) + .activate() + .session(&edge_cap) .give_iterator((0 .. worker_batch).map(|_| { ((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), Pair::new(1, 0), 1) })); @@ -124,7 +128,8 @@ fn main() { } edge_input - .session(edge_cap.clone()) + .activate() + .session(&edge_cap) .give_iterator((0 .. worker_batch).map(|_| { ((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), Pair::new(2, 3), 1) })); @@ -136,7 +141,8 @@ fn main() { } edge_input - .session(edge_cap.clone()) + .activate() + .session(&edge_cap) .give_iterator((0 .. worker_batch).map(|_| { ((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), Pair::new(3, 1), 1) })); @@ -148,7 +154,8 @@ fn main() { } edge_input - .session(edge_cap0) + .activate() + .session(&edge_cap0) .give_iterator((0 .. worker_batch).map(|_| { ((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), Pair::new(0, 10), 1) })); diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index a7dd1205d..0e0ece76f 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -7,6 +7,7 @@ use timely::communication::Allocate; use timely::worker::Worker; use timely::logging::TimelyEvent; use timely::dataflow::operators::capture::event::EventIterator; +use timely::dataflow::operators::generic::OutputBuilder; use differential_dataflow::ExchangeData; use differential_dataflow::logging::DifferentialEvent; @@ -47,13 +48,20 @@ where use timely::dataflow::channels::pact::Pipeline; let mut input = demux.new_input(&input_stream, Pipeline); - let (mut operates_out, operates) = demux.new_output(); - let (mut channels_out, channels) = demux.new_output(); - let (mut schedule_out, schedule) = demux.new_output(); - let (mut messages_out, messages) = demux.new_output(); - let (mut shutdown_out, shutdown) = demux.new_output(); - let (mut park_out, park) = demux.new_output(); - let (mut text_out, text) = demux.new_output(); + let (operates_out, operates) = demux.new_output(); + let mut operates_out = OutputBuilder::from(operates_out); + let (channels_out, channels) = demux.new_output(); + let mut channels_out = OutputBuilder::from(channels_out); + let (schedule_out, schedule) = demux.new_output(); + let mut schedule_out = OutputBuilder::from(schedule_out); + let (messages_out, messages) = demux.new_output(); + let mut messages_out = OutputBuilder::from(messages_out); + let (shutdown_out, shutdown) = demux.new_output(); + let mut shutdown_out = OutputBuilder::from(shutdown_out); + let (park_out, park) = demux.new_output(); + let mut park_out = OutputBuilder::from(park_out); + let (text_out, text) = demux.new_output(); + let mut text_out = OutputBuilder::from(text_out); demux.build(move |_capability| { @@ -228,8 +236,10 @@ where use timely::dataflow::channels::pact::Pipeline; let mut input = demux.new_input(&input, Pipeline); - let (mut batch_out, batch) = demux.new_output(); - let (mut merge_out, merge) = demux.new_output(); + let (batch_out, batch) = demux.new_output(); + let mut batch_out = OutputBuilder::from(batch_out); + let (merge_out, merge) = demux.new_output(); + let mut merge_out = OutputBuilder::from(merge_out); demux.build(move |_capability| { From 4f42fe1962bc17d76e40af1e03ee37c15b56233c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 23 Oct 2025 17:52:05 +0200 Subject: [PATCH 2/3] Restore proper Cargo.toml Signed-off-by: Moritz Hoffmann --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4cdbfb214..6eb80fb41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,9 @@ resolver = "2" [workspace.dependencies] differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.17.0" } -#timely = { version = "0.24", default-features = false } -columnar = { version = "0.10", default-features = false } -timely = { path = "../timely-dataflow/timely/", default-features = false } +timely = { version = "0.25", default-features = false } +columnar = { version = "0.11", default-features = false } +#timely = { path = "../timely-dataflow/timely/", default-features = false } [profile.release] opt-level = 3 From dc6e12713cd5933e0c9e850141f25343503e40d5 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 23 Oct 2025 12:30:57 -0400 Subject: [PATCH 3/3] Update columnar example to timely 0.25 --- differential-dataflow/examples/columnar.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 14b6253ff..ec5a5f2a3 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -298,7 +298,7 @@ pub mod storage { pub mod val { use std::fmt::Debug; - use columnar::{Container, ContainerOf, Index, Len, Push}; + use columnar::{Borrow, Container, ContainerOf, Index, Len, Push}; use columnar::Vecs; use crate::layout::ColumnarUpdate as Update; @@ -406,7 +406,7 @@ pub mod storage { pub mod key { - use columnar::{Container, ContainerOf, Index, Len, Push}; + use columnar::{Borrow, Container, ContainerOf, Index, Len, Push}; use columnar::Vecs; use crate::layout::ColumnarUpdate as Update; @@ -532,7 +532,7 @@ mod column_builder { self.current.push(item); if self.current.len() > 1024 * 1024 { // TODO: Consolidate the batch? - use columnar::{Container, Index}; + use columnar::{Borrow, Index}; let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); let storage = ValStorage::form(refs.into_iter()); @@ -570,7 +570,7 @@ mod column_builder { fn finish(&mut self) -> Option<&mut Self::Container> { if !self.current.is_empty() { // TODO: Consolidate the batch? - use columnar::{Container, Index}; + use columnar::{Borrow, Index}; let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); let storage = ValStorage::form(refs.into_iter()); @@ -612,7 +612,7 @@ mod column_builder { self.current.push(item); if self.current.len() > 1024 * 1024 { // TODO: Consolidate the batch? - use columnar::{Container, Index}; + use columnar::{Borrow, Index}; let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); let storage = KeyStorage::form(refs.into_iter()); @@ -642,7 +642,7 @@ mod column_builder { fn finish(&mut self) -> Option<&mut Self::Container> { if !self.current.is_empty() { // TODO: Consolidate the batch? - use columnar::{Container, Index}; + use columnar::{Borrow, Index}; let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); let storage = KeyStorage::form(refs.into_iter()); @@ -763,7 +763,7 @@ pub mod arrangement { pub use batch_container::Coltainer; pub mod batch_container { - use columnar::{Columnar, Container, Clear, Push, Index, Len}; + use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len}; use differential_dataflow::trace::implementations::BatchContainer; /// Container, anchored by `C` to provide an owned type. @@ -815,7 +815,7 @@ pub mod arrangement { pub mod batcher { use std::ops::Range; - use columnar::{Columnar, Container, Index, Len, Push}; + use columnar::{Borrow, Columnar, Container, Index, Len, Push}; use differential_dataflow::trace::implementations::chainless_batcher as chainless; use differential_dataflow::difference::{Semigroup, IsZero}; use timely::progress::frontier::{Antichain, AntichainRef};