Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,9 @@ pub mod storage {
impl<U: Update> ValStorage<U> {

/// Forms `Self` from sorted update tuples.
pub fn form<'a>(sorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
pub fn form<'a>(mut sorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {

let mut output = Self::default();
let mut sorted = sorted.peekable();

if let Some((key,val,time,diff)) = sorted.next() {
output.keys.push(key);
Expand Down Expand Up @@ -429,14 +428,11 @@ pub mod storage {
impl<U: Update> KeyStorage<U> {

/// Forms `Self` from sorted update tuples.
pub fn form<'a>(sorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
pub fn form<'a>(mut sorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {

let mut keys: ContainerOf<U::Key> = Default::default();
let mut upds: Vecs<(ContainerOf<U::Time>, ContainerOf<U::Diff>)> = Default::default();

// let mut output = Self::default();
let mut sorted = sorted.peekable();

if let Some((key,time,diff)) = sorted.next() {
keys.push(key);
upds.values.push((time, diff));
Expand Down Expand Up @@ -666,7 +662,6 @@ mod distributor {
use std::rc::Rc;

use columnar::{Index, Len};
use timely::container::{ContainerBuilder, PushInto};
use timely::logging::TimelyLogger;
use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor};
use timely::dataflow::channels::Message;
Expand All @@ -675,34 +670,39 @@ mod distributor {
use timely::worker::AsWorker;

use crate::layout::ColumnarUpdate as Update;
use crate::{KeyColBuilder, KeyStorage};
use crate::KeyStorage;

pub struct KeyDistributor<U: Update, H> {
builders: Vec<KeyColBuilder<U>>,
marker: std::marker::PhantomData<U>,
hashfunc: H,
}

impl<U: Update, H: for<'a> FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor<KeyStorage<U>> for KeyDistributor<U, H> {
fn partition<T: Clone, P: timely::communication::Push<Message<T, KeyStorage<U>>>>(&mut self, container: &mut KeyStorage<U>, time: &T, pushers: &mut [P]) {
// For each key, partition and copy (key, time, diff) into the appropriate self.builder.
for index in 0 .. container.keys.borrow().len() {
let key = container.keys.borrow().get(index);
let idx = ((self.hashfunc)(key) as usize) % self.builders.len();
for (t, diff) in container.upds.borrow().get(index).into_index_iter() {
self.builders[idx].push_into((key, t, diff));
}
while let Some(produced) = self.builders[idx].extract() {
Message::push_at(produced, time.clone(), &mut pushers[idx]);
}

use columnar::{ContainerOf, Vecs, Container, Push};
use crate::Column;

let in_keys = container.keys.borrow();
let in_upds = container.upds.borrow();

// We build bespoke containers by determining the target for each key using `self.hashfunc`, and then copying in key and associated data.
// We bypass the container builders, which do much work to go from tuples to columnar containers, and we save time by avoiding that round trip.
let mut out_keys = vec![ContainerOf::<U::Key>::default(); pushers.len()];
let mut out_upds = vec![Vecs::<(ContainerOf::<U::Time>, ContainerOf::<U::Diff>)>::default(); pushers.len()];
for index in 0 .. in_keys.len() {
let key = in_keys.get(index);
let idx = ((self.hashfunc)(key) as usize) % pushers.len();
out_keys[idx].push(key);
out_upds[idx].extend_from_self(in_upds, index..index+1);
}
}
fn flush<T: Clone, P: timely::communication::Push<Message<T, KeyStorage<U>>>>(&mut self, time: &T, pushers: &mut [P]) {
for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
while let Some(container) = builder.finish() {
Message::push_at(container, time.clone(), pusher);
}

for ((pusher, keys), upds) in pushers.iter_mut().zip(out_keys).zip(out_upds) {
let mut container = KeyStorage { keys: Column::Typed(keys), upds: Column::Typed(upds) };
Message::push_at(&mut container, time.clone(), pusher);
}
}
fn flush<T: Clone, P: timely::communication::Push<Message<T, KeyStorage<U>>>>(&mut self, _time: &T, _pushers: &mut [P]) { }
fn relax(&mut self) { }
}

Expand All @@ -726,7 +726,7 @@ mod distributor {
let (senders, receiver) = allocator.allocate::<Message<T, KeyStorage<U>>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
let distributor = KeyDistributor {
builders: std::iter::repeat_with(Default::default).take(allocator.peers()).collect(),
marker: std::marker::PhantomData,
hashfunc: self.hashfunc,
};
(Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub mod layers {

impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
/// Lower and upper bounds in `self.vals` of the indexed list.
pub fn bounds(&self, index: usize) -> (usize, usize) {
#[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) {
(self.offs.index(index), self.offs.index(index+1))
}
/// Retrieves a value using relative indexes.
Expand Down