7575use std:: collections:: { BinaryHeap , HashMap , VecDeque } ;
7676use std:: cmp:: Reverse ;
7777
78- use columnar:: { Columnar , Len , Index } ;
79- use columnar:: ColumnVec ;
78+ use columnar:: { Len , Index } ;
79+ use columnar:: Vecs ;
8080
8181use crate :: progress:: Timestamp ;
8282use crate :: progress:: { Source , Target } ;
@@ -87,56 +87,43 @@ use crate::progress::frontier::{Antichain, MutableAntichain};
8787use crate :: progress:: timestamp:: PathSummary ;
8888
8989
90- use vec_antichain :: VecAntichain ;
90+ use antichains :: Antichains ;
9191
9292/// A stand-in for `Vec<Antichain<T>>`.
93- mod vec_antichain {
93+ mod antichains {
9494
95- use columnar:: { Columnar , Len , Index , IndexMut } ;
96- use columnar:: { ColumnVec , Slice } ;
95+ use columnar:: { Len , Index , Push } ;
96+ use columnar:: Vecs ;
9797
9898 use crate :: progress:: Antichain ;
9999
100100 #[ derive( Clone , Debug ) ]
101- pub struct VecAntichain < T > ( ColumnVec < T > ) ;
101+ pub struct Antichains < T > ( Vecs < Vec < T > > ) ;
102102
103- impl < TC : Default > Default for VecAntichain < TC > {
103+ impl < T > Default for Antichains < T > {
104104 fn default ( ) -> Self {
105105 Self ( Default :: default ( ) )
106106 }
107107 }
108108
109- impl < TC > Len for VecAntichain < TC > {
109+ impl < T > Len for Antichains < T > {
110110 #[ inline( always) ] fn len ( & self ) -> usize { self . 0 . len ( ) }
111111 }
112112
113- impl < TC > Index for VecAntichain < TC > {
114- type Index < ' a > = Slice < & ' a TC > where TC : ' a ;
115-
113+ impl < T > Push < Antichain < T > > for Antichains < T > {
116114 #[ inline( always) ]
117- fn index ( & self , index : usize ) -> Self :: Index < ' _ > {
118- self . 0 . index ( index)
115+ fn push ( & mut self , item : Antichain < T > ) {
116+ columnar:: Push :: extend ( & mut self . 0 . values , item) ;
117+ self . 0 . bounds . push ( self . 0 . values . len ( ) ) ;
119118 }
120119 }
121- impl < TC > IndexMut for VecAntichain < TC > {
122- type IndexMut < ' a > = Slice < & ' a mut TC > where TC : ' a ;
123120
124- #[ inline( always) ]
125- fn index_mut ( & mut self , index : usize ) -> Self :: IndexMut < ' _ > {
126- self . 0 . index_mut ( index)
127- }
128- }
121+ impl < ' a , T > Index for & ' a Antichains < T > {
122+ type Ref = <& ' a Vecs < Vec < T > > as Index >:: Ref ;
129123
130- impl < T , TC : Columnar < T > > Columnar < Antichain < T > > for VecAntichain < TC > {
131124 #[ inline( always) ]
132- fn copy ( & mut self , item : & Antichain < T > ) {
133- self . 0 . copy ( item. elements ( ) ) ;
134- }
135- fn clear ( & mut self ) {
136- unimplemented ! ( )
137- }
138- fn heap_size ( & self ) -> ( usize , usize ) {
139- unimplemented ! ( )
125+ fn get ( & self , index : usize ) -> Self :: Ref {
126+ ( & self . 0 ) . get ( index)
140127 }
141128 }
142129}
@@ -191,7 +178,7 @@ pub struct Builder<T: Timestamp> {
191178 /// Indexed by operator index, then input port, then output port. This is the
192179 /// same format returned by `get_internal_summary`, as if we simply appended
193180 /// all of the summaries for the hosted nodes.
194- nodes : ColumnVec < ColumnVec < VecAntichain < Vec < T :: Summary > > > > ,
181+ nodes : Vecs < Vecs < Antichains < T :: Summary > > > ,
195182 /// Direct connections from sources to targets.
196183 ///
197184 /// Edges do not affect timestamps, so we only need to know the connectivity.
@@ -223,6 +210,7 @@ impl<T: Timestamp> Builder<T> {
223210
224211 assert_eq ! ( self . nodes. len( ) , index) ;
225212
213+ use columnar:: Push ;
226214 self . nodes . push ( summary) ;
227215 self . edges . push ( Vec :: new ( ) ) ;
228216 self . shape . push ( ( 0 , 0 ) ) ;
@@ -328,7 +316,7 @@ impl<T: Timestamp> Builder<T> {
328316
329317 // Load edges as default summaries.
330318 for ( index, ports) in self . edges . iter ( ) . enumerate ( ) {
331- for ( output, targets) in ports. iter ( ) . enumerate ( ) {
319+ for ( output, targets) in ( * ports) . iter ( ) . enumerate ( ) {
332320 let source = Location :: new_source ( index, output) ;
333321 in_degree. entry ( source) . or_insert ( 0 ) ;
334322 for & target in targets. iter ( ) {
@@ -339,13 +327,13 @@ impl<T: Timestamp> Builder<T> {
339327 }
340328
341329 // Load default intra-node summaries.
342- for ( index, summary) in self . nodes . iter ( ) . enumerate ( ) {
343- for ( input, outputs) in summary. iter ( ) . enumerate ( ) {
330+ for ( index, summary) in ( & self . nodes ) . into_iter ( ) . enumerate ( ) {
331+ for ( input, outputs) in summary. into_iter ( ) . enumerate ( ) {
344332 let target = Location :: new_target ( index, input) ;
345333 in_degree. entry ( target) . or_insert ( 0 ) ;
346- for ( output, summaries) in outputs. iter ( ) . enumerate ( ) {
334+ for ( output, summaries) in outputs. into_iter ( ) . enumerate ( ) {
347335 let source = Location :: new_source ( index, output) ;
348- for summary in summaries. iter ( ) {
336+ for summary in summaries. into_iter ( ) {
349337 if summary == & Default :: default ( ) {
350338 * in_degree. entry ( source) . or_insert ( 0 ) += 1 ;
351339 }
@@ -380,9 +368,9 @@ impl<T: Timestamp> Builder<T> {
380368 }
381369 } ,
382370 Port :: Target ( port) => {
383- for ( output, summaries) in self . nodes . index ( node) . index ( port) . iter ( ) . enumerate ( ) {
371+ for ( output, summaries) in ( & self . nodes ) . get ( node) . get ( port) . into_iter ( ) . enumerate ( ) {
384372 let source = Location :: new_source ( node, output) ;
385- for summary in summaries. iter ( ) {
373+ for summary in summaries. into_iter ( ) {
386374 if summary == & Default :: default ( ) {
387375 * in_degree. get_mut ( & source) . unwrap ( ) -= 1 ;
388376 if in_degree[ & source] == 0 {
@@ -419,12 +407,12 @@ pub struct Tracker<T:Timestamp> {
419407 /// Indexed by operator index, then input port, then output port. This is the
420408 /// same format returned by `get_internal_summary`, as if we simply appended
421409 /// all of the summaries for the hosted nodes.
422- nodes : ColumnVec < ColumnVec < VecAntichain < Vec < T :: Summary > > > > ,
410+ nodes : Vecs < Vecs < Antichains < T :: Summary > > > ,
423411 /// Direct connections from sources to targets.
424412 ///
425413 /// Edges do not affect timestamps, so we only need to know the connectivity.
426414 /// Indexed by operator index then output port.
427- edges : ColumnVec < ColumnVec < Vec < Target > > > ,
415+ edges : Vecs < Vecs < Vec < Target > > > ,
428416
429417 // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
430418 // It seems we should be able to flatten most of these so that there are a few allocations
@@ -602,7 +590,8 @@ impl<T:Timestamp> Tracker<T> {
602590 let scope_outputs = builder. shape [ 0 ] . 0 ;
603591 let output_changes = vec ! [ ChangeBatch :: new( ) ; scope_outputs] ;
604592
605- let mut edges: ColumnVec < ColumnVec < Vec < Target > > > = Default :: default ( ) ;
593+ use columnar:: Push ;
594+ let mut edges: Vecs < Vecs < Vec < Target > > > = Default :: default ( ) ;
606595 for edge in builder. edges {
607596 edges. push ( edge) ;
608597 }
@@ -726,10 +715,10 @@ impl<T:Timestamp> Tracker<T> {
726715 . update_iter ( Some ( ( time, diff) ) ) ;
727716
728717 for ( time, diff) in changes {
729- let nodes = & self . nodes . index ( location. node ) . index ( port_index) ;
730- for ( output_port, summaries) in nodes. iter ( ) . enumerate ( ) {
718+ let nodes = & ( & self . nodes ) . get ( location. node ) . get ( port_index) ;
719+ for ( output_port, summaries) in nodes. into_iter ( ) . enumerate ( ) {
731720 let source = Location { node : location. node , port : Port :: Source ( output_port) } ;
732- for summary in summaries. iter ( ) {
721+ for summary in summaries. into_iter ( ) {
733722 if let Some ( new_time) = summary. results_in ( & time) {
734723 self . worklist . push ( Reverse ( ( new_time, source, diff) ) ) ;
735724 }
@@ -749,7 +738,7 @@ impl<T:Timestamp> Tracker<T> {
749738 . update_iter ( Some ( ( time, diff) ) ) ;
750739
751740 for ( time, diff) in changes {
752- for new_target in self . edges . index ( location. node ) . index ( port_index) . iter ( ) {
741+ for new_target in ( & self . edges ) . get ( location. node ) . get ( port_index) . into_iter ( ) {
753742 self . worklist . push ( Reverse ( (
754743 time. clone ( ) ,
755744 Location :: from ( * new_target) ,
@@ -801,14 +790,14 @@ impl<T:Timestamp> Tracker<T> {
801790/// Graph locations may be missing from the output, in which case they have no
802791/// paths to scope outputs.
803792fn summarize_outputs < T : Timestamp > (
804- nodes : & ColumnVec < ColumnVec < VecAntichain < Vec < T :: Summary > > > > ,
793+ nodes : & Vecs < Vecs < Antichains < T :: Summary > > > ,
805794 edges : & Vec < Vec < Vec < Target > > > ,
806795 ) -> HashMap < Location , Vec < Antichain < T :: Summary > > >
807796{
808797 // A reverse edge map, to allow us to walk back up the dataflow graph.
809798 let mut reverse = HashMap :: new ( ) ;
810- for ( node, outputs) in edges . iter ( ) . enumerate ( ) {
811- for ( output, targets) in outputs . iter ( ) . enumerate ( ) {
799+ for ( node, outputs) in columnar :: Index :: into_iter ( edges ) . enumerate ( ) {
800+ for ( output, targets) in columnar :: Index :: into_iter ( outputs ) . enumerate ( ) {
812801 for target in targets. iter ( ) {
813802 reverse. insert (
814803 Location :: from ( * target) ,
@@ -822,10 +811,9 @@ fn summarize_outputs<T: Timestamp>(
822811 let mut worklist = VecDeque :: < ( Location , usize , T :: Summary ) > :: new ( ) ;
823812
824813 let outputs =
825- edges
826- . iter ( )
827- . flat_map ( |x| x. iter ( ) )
828- . flat_map ( |x| x. iter ( ) )
814+ columnar:: Index :: into_iter ( edges)
815+ . flat_map ( |x| columnar:: Index :: into_iter ( x) )
816+ . flat_map ( |x| columnar:: Index :: into_iter ( x) )
829817 . filter ( |target| target. node == 0 ) ;
830818
831819 // The scope may have no outputs, in which case we can do no work.
@@ -843,7 +831,7 @@ fn summarize_outputs<T: Timestamp>(
843831 Port :: Source ( output_port) => {
844832
845833 // Consider each input port of the associated operator.
846- for ( input_port, summaries) in nodes. index ( location. node ) . iter ( ) . enumerate ( ) {
834+ for ( input_port, summaries) in nodes. get ( location. node ) . into_iter ( ) . enumerate ( ) {
847835
848836 // Determine the current path summaries from the input port.
849837 let location = Location { node : location. node , port : Port :: Target ( input_port) } ;
@@ -855,7 +843,7 @@ fn summarize_outputs<T: Timestamp>(
855843 while antichains. len ( ) <= output { antichains. push ( Antichain :: new ( ) ) ; }
856844
857845 // Combine each operator-internal summary to the output with `summary`.
858- for operator_summary in summaries. index ( output_port) . iter ( ) {
846+ for operator_summary in summaries. get ( output_port) . into_iter ( ) {
859847 if let Some ( combined) = operator_summary. followed_by ( & summary) {
860848 if antichains[ output] . insert ( combined. clone ( ) ) {
861849 worklist. push_back ( ( location, output, combined) ) ;
0 commit comments