Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- windows
toolchain:
- stable
- 1.85
- 1.86
name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }}
runs-on: ${{ matrix.os }}-latest
steps:
Expand Down
19 changes: 10 additions & 9 deletions mdbook/src/chapter_2/chapter_2_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ use timely::dataflow::operators::{ToStream, Partition, Inspect};

fn main() {
timely::example(|scope| {
let streams = (0..10).to_stream(scope)
.partition(3, |x| (x % 3, x));
let mut streams = (0..10).to_stream(scope)
.partition(3, |x| (x % 3, x));

streams[0].inspect(|x| println!("seen 0: {:?}", x));
streams[1].inspect(|x| println!("seen 1: {:?}", x));
streams[2].inspect(|x| println!("seen 2: {:?}", x));
streams.pop().unwrap().inspect(|x| println!("seen 2: {:?}", x));
streams.pop().unwrap().inspect(|x| println!("seen 1: {:?}", x));
streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x));
});
}
```
Expand All @@ -147,11 +147,12 @@ use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect};

fn main() {
timely::example(|scope| {
let streams = (0..10).to_stream(scope)
let mut streams = (0..10).to_stream(scope)
.partition(3, |x| (x % 3, x));
streams[0]
.concat(&streams[1])
.concat(&streams[2])
streams
.pop().unwrap()
.concat(streams.pop().unwrap())
.concat(streams.pop().unwrap())
.inspect(|x| println!("seen: {:?}", x));
});
}
Expand Down
4 changes: 2 additions & 2 deletions mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn main() {
let in1 = (0 .. 10).to_stream(scope);
let in2 = (0 .. 10).to_stream(scope);

in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {

let mut notificator = FrontierNotificator::default();
let mut stash = HashMap::new();
Expand Down Expand Up @@ -233,7 +233,7 @@ fn main() {
let in1 = (0 .. 10).to_stream(scope);
let in2 = (0 .. 10).to_stream(scope);

in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {
in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {

let mut stash = HashMap::new();

Expand Down
14 changes: 7 additions & 7 deletions mdbook/src/chapter_4/chapter_4_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
// circulate numbers, Collatz stepping each time.
(1 .. 10)
.to_stream(scope)
.concat(&stream)
.concat(stream)
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
.inspect(|x| println!("{:?}", x))
.filter(|x| *x != 1)
Expand Down Expand Up @@ -63,17 +63,17 @@ fn main() {
let results1 = stream1.map(|x| 3 * x + 1);

// partition the input and feedback streams by even-ness.
let parts =
let mut parts =
(1 .. 10)
.to_stream(scope)
.concat(&results0)
.concat(&results1)
.concat(results0)
.concat(results1)
.inspect(|x| println!("{:?}", x))
.partition(2, |x| (x % 2, x));

// connect each part appropriately.
parts[0].connect_loop(handle0);
parts[1].connect_loop(handle1);
parts.pop().unwrap().connect_loop(handle1);
parts.pop().unwrap().connect_loop(handle0);
});
}
```
Expand Down Expand Up @@ -103,7 +103,7 @@ fn main() {

input
.enter(subscope)
.concat(&stream)
.concat(stream)
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
.inspect(|x| println!("{:?}", x))
.filter(|x| *x != 1)
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_4/chapter_4_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn main() {
// Assign timestamps to records so that not much work is in each time.
.delay(|number, time| number / 100 )
// Buffer records until all prior timestamps have completed.
.binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
.binary_frontier(cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {

move |(input1, frontier1), (input2, frontier2), output| {

Expand Down
4 changes: 2 additions & 2 deletions timely/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() {

// use the stream of edges
graph.binary_notify(
&stream,
stream,
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
"BFS",
Expand Down Expand Up @@ -130,7 +130,7 @@ fn main() {
});
}
)
.concat(&(0..1).map(|x| (x,x)).to_stream(scope))
.concat((0..1).map(|x| (x,x)).to_stream(scope))
.connect_loop(handle);
});
}).unwrap(); // asserts error-free execution;
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/hashjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn main() {
let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);

stream1
.binary(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
.binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {

let mut map1 = HashMap::<u64, Vec<u64>>::new();
let mut map2 = HashMap::<u64, Vec<u64>>::new();
Expand Down
6 changes: 3 additions & 3 deletions timely/examples/loopdemo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ fn main() {

let step =
stream
.concat(&loop_stream)
.concat(loop_stream)
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
.filter(|x| x > &1);

step.connect_loop(loop_handle);
step.probe_with(&probe);
step.probe_with(&probe)
.connect_loop(loop_handle);
});

let ns_per_request = 1_000_000_000 / rate;
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn main() {

// bring edges and ranks together!
let changes = edge_stream.binary_frontier(
&rank_stream,
rank_stream,
Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
Exchange::new(|x: &(usize, i64)| x.0 as u64),
"PageRank",
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn main() {
(0 .. elements)
.filter(move |&x| (x as usize) % peers == index)
.to_stream(scope)
.concat(&cycle)
.concat(cycle)
.exchange(|&x| x)
.map_in_place(|x| *x += 1)
.branch_when(move |t| t < &iterations).1
Expand Down
4 changes: 2 additions & 2 deletions timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ fn main() {
}

trait UnionFind {
fn union_find(&self) -> Self;
fn union_find(self) -> Self;
}

impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
fn union_find(&self) -> Stream<G, (usize, usize)> {
fn union_find(self) -> Stream<G, (usize, usize)> {

self.unary(Pipeline, "UnionFind", |_,_| {

Expand Down
17 changes: 9 additions & 8 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use serde::{Deserialize, Serialize};
use crate::communication::Push;
use crate::Container;

/// A collection of types that may be pushed at.
pub mod pushers;
Expand Down Expand Up @@ -32,19 +31,21 @@ impl<T, C> Message<T, C> {
}
}

impl<T, C: Container> Message<T, C> {
impl<T, C> Message<T, C> {
/// Creates a new message instance from arguments.
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
Message { time, data, from, seq }
///
/// Zero values are installed for `from` and `seq`, and are meant to be populated by `LogPusher`.
pub fn new(time: T, data: C) -> Self {
Message { time, data, from: 0, seq: 0 }
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element. The buffer is left in an undefined state.
/// Forms a message from borrowed parts, and replaces `buffer` with what is left by the `push` call.
/// If the pusher returns nothing, then `buffer` is set to the default for the container.
#[inline]
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) where C: Default {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let message = Message::new(time, data);
let mut bundle = Some(message);

pusher.push(&mut bundle);
Expand Down
Loading
Loading