Skip to content

Commit fce4a22

Browse files
Migrate Join logic away from traits (#668)
* Migrate Join logic away from traits * Update mdbook
1 parent 6bd0742 commit fce4a22

31 files changed

Lines changed: 195 additions & 342 deletions

differential-dataflow/examples/arrange.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use timely::scheduling::Scheduler;
66

77
use differential_dataflow::input::Input;
88
use differential_dataflow::AsCollection;
9-
use differential_dataflow::operators::join::JoinCore;
109
use differential_dataflow::operators::Iterate;
1110

1211
fn main() {

differential-dataflow/examples/interpreted.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use timely::dataflow::operators::*;
44

55
use differential_dataflow::VecCollection;
66
use differential_dataflow::lattice::Lattice;
7-
use differential_dataflow::operators::*;
87

98
use graph_map::GraphMMap;
109

differential-dataflow/examples/itembased_cf.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use differential_dataflow::input::InputSession;
2-
use differential_dataflow::operators::{Join,CountTotal};
3-
use differential_dataflow::operators::join::JoinCore;
2+
use differential_dataflow::operators::CountTotal;
43

54
use rand::{Rng, SeedableRng, StdRng};
65

differential-dataflow/examples/monoid-bfs.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use timely::dataflow::operators::probe::Handle;
66

77
use differential_dataflow::input::Input;
88
use differential_dataflow::VecCollection;
9-
use differential_dataflow::operators::*;
109
use differential_dataflow::lattice::Lattice;
1110

1211
type Node = u32;

differential-dataflow/examples/pagerank.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use timely::dataflow::{*, operators::Filter};
33

44
use differential_dataflow::VecCollection;
55
use differential_dataflow::lattice::Lattice;
6-
use differential_dataflow::operators::{*, iterate::Variable};
6+
use differential_dataflow::operators::iterate::Variable;
77
use differential_dataflow::input::InputSession;
88
use differential_dataflow::AsCollection;
99

differential-dataflow/examples/projekt.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use timely::dataflow::operators::probe::Handle;
22

33
use differential_dataflow::input::InputSession;
4-
use differential_dataflow::operators::*;
54

65
fn main() {
76

differential-dataflow/src/algorithms/graphs/bijkstra.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use timely::order::Product;
66
use timely::dataflow::*;
77

88
use crate::{VecCollection, ExchangeData};
9-
use crate::operators::*;
109
use crate::lattice::Lattice;
1110
use crate::operators::iterate::Variable;
1211

differential-dataflow/src/collection.rs

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,163 @@ pub mod vec {
10771077
}
10781078
}
10791079

1080+
impl<G, K, V, R> Collection<G, (K, V), R>
1081+
where
1082+
G: Scope<Timestamp: Lattice+Ord>,
1083+
K: crate::ExchangeData+Hashable,
1084+
V: crate::ExchangeData,
1085+
R: crate::ExchangeData+Semigroup,
1086+
{
1087+
/// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
1088+
///
1089+
/// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
1090+
///
1091+
/// # Examples
1092+
///
1093+
/// ```
1094+
/// use differential_dataflow::input::Input;
1095+
///
1096+
/// ::timely::example(|scope| {
1097+
///
1098+
/// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1099+
/// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1100+
/// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
1101+
///
1102+
/// x.join(&y)
1103+
/// .assert_eq(&z);
1104+
/// });
1105+
/// ```
1106+
pub fn join<V2, R2>(&self, other: &Collection<G, (K,V2), R2>) -> Collection<G, (K,(V,V2)), <R as Multiply<R2>>::Output>
1107+
where
1108+
K: crate::ExchangeData,
1109+
V2: crate::ExchangeData,
1110+
R2: crate::ExchangeData+Semigroup,
1111+
R: Multiply<R2, Output: Semigroup+'static>,
1112+
{
1113+
self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
1114+
}
1115+
1116+
/// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
1117+
///
1118+
/// # Examples
1119+
///
1120+
/// ```
1121+
/// use differential_dataflow::input::Input;
1122+
///
1123+
/// ::timely::example(|scope| {
1124+
///
1125+
/// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1126+
/// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1127+
/// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1128+
///
1129+
/// x.join_map(&y, |_key, &a, &b| (a,b))
1130+
/// .assert_eq(&z);
1131+
/// });
1132+
/// ```
1133+
pub fn join_map<V2: crate::ExchangeData, R2: crate::ExchangeData+Semigroup, D: crate::Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
1134+
where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
1135+
let arranged1 = self.arrange_by_key();
1136+
let arranged2 = other.arrange_by_key();
1137+
arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
1138+
}
1139+
1140+
/// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
1141+
///
1142+
/// When the second collection contains frequencies that are either zero or one this is the more traditional
1143+
/// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
1144+
/// the counts of the records in the first input.
1145+
///
1146+
/// # Examples
1147+
///
1148+
/// ```
1149+
/// use differential_dataflow::input::Input;
1150+
///
1151+
/// ::timely::example(|scope| {
1152+
///
1153+
/// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1154+
/// let y = scope.new_collection_from(vec![0, 2]).1;
1155+
/// let z = scope.new_collection_from(vec![(0, 1)]).1;
1156+
///
1157+
/// x.semijoin(&y)
1158+
/// .assert_eq(&z);
1159+
/// });
1160+
/// ```
1161+
pub fn semijoin<R2: crate::ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
1162+
where R: Multiply<R2, Output: Semigroup+'static> {
1163+
let arranged1 = self.arrange_by_key();
1164+
let arranged2 = other.arrange_by_self();
1165+
arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
1166+
}
10801167

1168+
/// Subtracts the semijoin with `other` from `self`.
1169+
///
1170+
/// In the case that `other` has multiplicities zero or one this results
1171+
/// in a relational antijoin, in which we discard input records whose key
1172+
/// is present in `other`. If the multiplicities could be other than zero
1173+
/// or one, the semantic interpretation of this operator is less clear.
1174+
///
1175+
/// In almost all cases, you should ensure that `other` has multiplicities
1176+
/// that are zero or one, perhaps by using the `distinct` operator.
1177+
///
1178+
/// # Examples
1179+
///
1180+
/// ```
1181+
/// use differential_dataflow::input::Input;
1182+
///
1183+
/// ::timely::example(|scope| {
1184+
///
1185+
/// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1186+
/// let y = scope.new_collection_from(vec![0, 2]).1;
1187+
/// let z = scope.new_collection_from(vec![(1, 3)]).1;
1188+
///
1189+
/// x.antijoin(&y)
1190+
/// .assert_eq(&z);
1191+
/// });
1192+
/// ```
1193+
pub fn antijoin<R2: crate::ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), R>
1194+
where R: Multiply<R2, Output=R>, R: Abelian+'static {
1195+
self.concat(&self.semijoin(other).negate())
1196+
}
1197+
1198+
/// Joins two arranged collections with the same key type.
1199+
///
1200+
/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
1201+
/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
1202+
/// every value returned by the iterator.
1203+
///
1204+
/// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
1205+
/// contains the implementations for collections.
1206+
///
1207+
/// # Examples
1208+
///
1209+
/// ```
1210+
/// use differential_dataflow::input::Input;
1211+
/// use differential_dataflow::trace::Trace;
1212+
///
1213+
/// ::timely::example(|scope| {
1214+
///
1215+
/// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
1216+
/// .arrange_by_key();
1217+
/// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
1218+
/// .arrange_by_key();
1219+
///
1220+
/// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1221+
///
1222+
/// x.join_core(&y, |_key, &a, &b| Some((a, b)))
1223+
/// .assert_eq(&z);
1224+
/// });
1225+
/// ```
1226+
pub fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
1227+
where
1228+
Tr2: for<'a> crate::trace::TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
1229+
R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
1230+
I: IntoIterator<Item: crate::Data>,
1231+
L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
1232+
{
1233+
self.arrange_by_key()
1234+
.join_core(stream2, result)
1235+
}
1236+
}
10811237
}
10821238

10831239
/// Conversion to a differential dataflow Collection.

differential-dataflow/src/operators/arrange/arrangement.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,19 @@ where
142142
self.flat_map_ref(move |key, val| Some(logic(key,val)))
143143
}
144144

145+
/// Flattens the stream into a `Collection`.
146+
///
147+
/// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
148+
/// and this method should only be used when the data need to be transformed or exchanged, rather than
149+
/// supplied as arguments to an operator using the same key-value structure.
150+
pub fn as_vecs(&self) -> VecCollection<G, (Tr::KeyOwn, Tr::ValOwn), Tr::Diff>
151+
where
152+
Tr::KeyOwn: crate::ExchangeData,
153+
Tr::ValOwn: crate::ExchangeData,
154+
{
155+
self.flat_map_ref(move |key, val| [(Tr::owned_key(key), Tr::owned_val(val))])
156+
}
157+
145158
/// Extracts elements from an arrangement as a collection.
146159
///
147160
/// The supplied logic may produce an iterator over output values, allowing either
@@ -198,30 +211,22 @@ where
198211
G: Scope<Timestamp=T1::Time>,
199212
T1: TraceReader + Clone + 'static,
200213
{
201-
/// A direct implementation of the `JoinCore::join_core` method.
214+
/// A convenience method to join and produce `VecCollection` output.
215+
///
216+
/// Avoid this method, as it is likely to evolve into one without the `VecCollection` opinion.
202217
pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> VecCollection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
203218
where
204219
T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
205220
T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
206221
I: IntoIterator<Item: Data>,
207222
L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
208223
{
209-
let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
224+
let mut result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
210225
let t = t.clone();
211226
let r = (r1.clone()).multiply(r2);
212227
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
213228
};
214-
self.join_core_internal_unsafe(other, result)
215-
}
216-
/// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
217-
pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> VecCollection<G,D,ROut>
218-
where
219-
T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
220-
D: Data,
221-
ROut: Semigroup+'static,
222-
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
223-
L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
224-
{
229+
225230
use crate::operators::join::join_traces;
226231
join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
227232
self,

0 commit comments

Comments
 (0)