22
33use crate :: dataflow:: channels:: pact:: Pipeline ;
44use crate :: dataflow:: operators:: generic:: builder_rc:: OperatorBuilder ;
5- use crate :: dataflow:: { Scope , OwnedStream , StreamLike } ;
5+ use crate :: dataflow:: { Scope , Stream , StreamCore } ;
66use crate :: Container ;
77
88/// Extension trait for `Stream`.
@@ -31,14 +31,14 @@ pub trait Branch<S: Scope, D> {
3131 fn branch (
3232 self ,
3333 condition : impl Fn ( & S :: Timestamp , & D ) -> bool + ' static ,
34- ) -> ( OwnedStream < S , Vec < D > > , OwnedStream < S , Vec < D > > ) ;
34+ ) -> ( Stream < S , D > , Stream < S , D > ) ;
3535}
3636
37- impl < G : Scope , D : ' static , S : StreamLike < G , Vec < D > > > Branch < G , D > for S {
37+ impl < S : Scope , D : ' static > Branch < S , D > for Stream < S , D > {
3838 fn branch (
3939 self ,
40- condition : impl Fn ( & G :: Timestamp , & D ) -> bool + ' static ,
41- ) -> ( OwnedStream < G , Vec < D > > , OwnedStream < G , Vec < D > > ) {
40+ condition : impl Fn ( & S :: Timestamp , & D ) -> bool + ' static ,
41+ ) -> ( Stream < S , D > , Stream < S , D > ) {
4242 let mut builder = OperatorBuilder :: new ( "Branch" . to_owned ( ) , self . scope ( ) ) ;
4343
4444 let mut input = builder. new_input ( self , Pipeline ) ;
@@ -69,7 +69,7 @@ impl<G: Scope, D: 'static, S: StreamLike<G, Vec<D>>> Branch<G, D> for S {
6969}
7070
7171/// Extension trait for `Stream`.
72- pub trait BranchWhen < G : Scope , C : Container > : Sized {
72+ pub trait BranchWhen < T > : Sized {
7373 /// Takes one input stream and splits it into two output streams.
7474 /// For each time, the supplied closure is called. If it returns `true`,
7575 /// the records for that will be sent to the second returned stream, otherwise
@@ -89,11 +89,11 @@ pub trait BranchWhen<G: Scope, C: Container>: Sized {
8989 /// after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
9090 /// });
9191 /// ```
92- fn branch_when ( self , condition : impl Fn ( & G :: Timestamp ) -> bool + ' static ) -> ( OwnedStream < G , C > , OwnedStream < G , C > ) ;
92+ fn branch_when ( self , condition : impl Fn ( & T ) -> bool + ' static ) -> ( Self , Self ) ;
9393}
9494
95- impl < G : Scope , C : Container + ' static , S : StreamLike < G , C > > BranchWhen < G , C > for S {
96- fn branch_when ( self , condition : impl Fn ( & G :: Timestamp ) -> bool + ' static ) -> ( OwnedStream < G , C > , OwnedStream < G , C > ) {
95+ impl < S : Scope , C : Container + ' static > BranchWhen < S :: Timestamp > for StreamCore < S , C > {
96+ fn branch_when ( self , condition : impl Fn ( & S :: Timestamp ) -> bool + ' static ) -> ( Self , Self ) {
9797 let mut builder = OperatorBuilder :: new ( "Branch" . to_owned ( ) , self . scope ( ) ) ;
9898
9999 let mut input = builder. new_input ( self , Pipeline ) ;
0 commit comments