From d46dc2e287668a10d9e36f7dad622b05750113c2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 17 Nov 2025 10:33:33 -0500 Subject: [PATCH] Exchange columns more efficiently --- differential-dataflow/examples/columnar.rs | 52 +++++++++---------- .../src/trace/implementations/ord_neu.rs | 2 +- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index ec5a5f2a3..bf502ce15 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -323,10 +323,9 @@ pub mod storage { impl ValStorage { /// Forms `Self` from sorted update tuples. - pub fn form<'a>(sorted: impl Iterator>>) -> Self { + pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { let mut output = Self::default(); - let mut sorted = sorted.peekable(); if let Some((key,val,time,diff)) = sorted.next() { output.keys.push(key); @@ -429,14 +428,11 @@ pub mod storage { impl KeyStorage { /// Forms `Self` from sorted update tuples. - pub fn form<'a>(sorted: impl Iterator>>) -> Self { + pub fn form<'a>(mut sorted: impl Iterator>>) -> Self { let mut keys: ContainerOf = Default::default(); let mut upds: Vecs<(ContainerOf, ContainerOf)> = Default::default(); - // let mut output = Self::default(); - let mut sorted = sorted.peekable(); - if let Some((key,time,diff)) = sorted.next() { keys.push(key); upds.values.push((time, diff)); @@ -666,7 +662,6 @@ mod distributor { use std::rc::Rc; use columnar::{Index, Len}; - use timely::container::{ContainerBuilder, PushInto}; use timely::logging::TimelyLogger; use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor}; use timely::dataflow::channels::Message; @@ -675,34 +670,39 @@ mod distributor { use timely::worker::AsWorker; use crate::layout::ColumnarUpdate as Update; - use crate::{KeyColBuilder, KeyStorage}; + use crate::KeyStorage; pub struct KeyDistributor { - builders: Vec>, + marker: std::marker::PhantomData, hashfunc: H, } impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> for KeyDistributor { fn partition>>>(&mut self, container: &mut KeyStorage, time: &T, pushers: &mut [P]) { - // For each key, partition and copy (key, time, diff) into the appropriate self.builder. - for index in 0 .. container.keys.borrow().len() { - let key = container.keys.borrow().get(index); - let idx = ((self.hashfunc)(key) as usize) % self.builders.len(); - for (t, diff) in container.upds.borrow().get(index).into_index_iter() { - self.builders[idx].push_into((key, t, diff)); - } - while let Some(produced) = self.builders[idx].extract() { - Message::push_at(produced, time.clone(), &mut pushers[idx]); - } + + use columnar::{ContainerOf, Vecs, Container, Push}; + use crate::Column; + + let in_keys = container.keys.borrow(); + let in_upds = container.upds.borrow(); + + // We build bespoke containers by determining the target for each key using `self.hashfunc`, and then copying in key and associated data. + // We bypass the container builders, which do much work to go from tuples to columnar containers, and we save time by avoiding that round trip. + let mut out_keys = vec![ContainerOf::::default(); pushers.len()]; + let mut out_upds = vec![Vecs::<(ContainerOf::, ContainerOf::)>::default(); pushers.len()]; + for index in 0 .. in_keys.len() { + let key = in_keys.get(index); + let idx = ((self.hashfunc)(key) as usize) % pushers.len(); + out_keys[idx].push(key); + out_upds[idx].extend_from_self(in_upds, index..index+1); } - } - fn flush>>>(&mut self, time: &T, pushers: &mut [P]) { - for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) { - while let Some(container) = builder.finish() { - Message::push_at(container, time.clone(), pusher); - } + + for ((pusher, keys), upds) in pushers.iter_mut().zip(out_keys).zip(out_upds) { + let mut container = KeyStorage { keys: Column::Typed(keys), upds: Column::Typed(upds) }; + Message::push_at(&mut container, time.clone(), pusher); } } + fn flush>>>(&mut self, _time: &T, _pushers: &mut [P]) { } fn relax(&mut self) { } } @@ -726,7 +726,7 @@ mod distributor { let (senders, receiver) = allocator.allocate::>>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); let distributor = KeyDistributor { - builders: std::iter::repeat_with(Default::default).take(allocator.peers()).collect(), + marker: std::marker::PhantomData, hashfunc: self.hashfunc, }; (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index b6e57ab0b..b6f9a198e 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -89,7 +89,7 @@ pub mod layers { impl BatchContainer = usize>, V: BatchContainer> Vals { /// Lower and upper bounds in `self.vals` of the indexed list. - pub fn bounds(&self, index: usize) -> (usize, usize) { + #[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) { (self.offs.index(index), self.offs.index(index+1)) } /// Retrieves a value using relative indexes.