@@ -26,7 +26,7 @@ pub struct Hydroflow {
2626
2727 // TODO(mingwei): separate scheduler into its own struct/trait?
2828 // Index is stratum, value is FIFO queue for that stratum.
29- ready_queue : Vec < VecDeque < SubgraphId > > ,
29+ stratum_queues : Vec < VecDeque < SubgraphId > > ,
3030 current_stratum : usize ,
3131 current_epoch : usize ,
3232
@@ -35,14 +35,14 @@ pub struct Hydroflow {
3535}
3636impl Default for Hydroflow {
3737 fn default ( ) -> Self {
38- let ( subgraphs, handoffs, states, ready_queue ) = Default :: default ( ) ;
38+ let ( subgraphs, handoffs, states, stratum_queues ) = Default :: default ( ) ;
3939 let ( event_queue_send, event_queue_recv) = mpsc:: unbounded_channel ( ) ;
4040 Self {
4141 subgraphs,
4242 handoffs,
4343 states,
4444
45- ready_queue ,
45+ stratum_queues ,
4646 current_stratum : 0 ,
4747 current_epoch : 0 ,
4848
@@ -84,7 +84,7 @@ impl Hydroflow {
8484 // Add any external jobs to ready queue.
8585 self . try_recv_events ( ) ;
8686
87- while let Some ( sg_id) = self . ready_queue [ self . current_stratum ] . pop_front ( ) {
87+ while let Some ( sg_id) = self . stratum_queues [ self . current_stratum ] . pop_front ( ) {
8888 {
8989 let sg_data = & mut self . subgraphs [ sg_id] ;
9090 // This must be true for the subgraph to be enqueued.
@@ -111,7 +111,7 @@ impl Hydroflow {
111111 continue ;
112112 }
113113 succ_sg_data. is_scheduled . set ( true ) ;
114- self . ready_queue [ succ_sg_data. stratum ] . push_back ( succ_id) ;
114+ self . stratum_queues [ succ_sg_data. stratum ] . push_back ( succ_id) ;
115115 }
116116 }
117117 }
@@ -128,12 +128,12 @@ impl Hydroflow {
128128 let old_stratum = self . current_stratum ;
129129 loop {
130130 // If current stratum has work, return true.
131- if !self . ready_queue [ self . current_stratum ] . is_empty ( ) {
131+ if !self . stratum_queues [ self . current_stratum ] . is_empty ( ) {
132132 return true ;
133133 }
134134 // Increment stratum counter.
135135 self . current_stratum += 1 ;
136- if self . current_stratum >= self . ready_queue . len ( ) {
136+ if self . current_stratum >= self . stratum_queues . len ( ) {
137137 self . current_stratum = 0 ;
138138 self . current_epoch += 1 ;
139139 }
@@ -175,7 +175,7 @@ impl Hydroflow {
175175 while let Ok ( sg_id) = self . event_queue_recv . try_recv ( ) {
176176 let sg_data = & self . subgraphs [ sg_id] ;
177177 if !sg_data. is_scheduled . replace ( true ) {
178- self . ready_queue [ sg_data. stratum ] . push_back ( sg_id) ;
178+ self . stratum_queues [ sg_data. stratum ] . push_back ( sg_id) ;
179179 enqueued_count += 1 ;
180180 }
181181 }
@@ -189,7 +189,7 @@ impl Hydroflow {
189189 let sg_id = self . event_queue_recv . blocking_recv ( ) ?;
190190 let sg_data = & self . subgraphs [ sg_id] ;
191191 if !sg_data. is_scheduled . replace ( true ) {
192- self . ready_queue [ sg_data. stratum ] . push_back ( sg_id) ;
192+ self . stratum_queues [ sg_data. stratum ] . push_back ( sg_id) ;
193193
194194 // Enqueue any other immediate events.
195195 return Some ( NonZeroUsize :: new ( self . try_recv_events ( ) + 1 ) . unwrap ( ) ) ;
@@ -204,7 +204,7 @@ impl Hydroflow {
204204 let sg_id = self . event_queue_recv . recv ( ) . await ?;
205205 let sg_data = & self . subgraphs [ sg_id] ;
206206 if !sg_data. is_scheduled . replace ( true ) {
207- self . ready_queue [ sg_data. stratum ] . push_back ( sg_id) ;
207+ self . stratum_queues [ sg_data. stratum ] . push_back ( sg_id) ;
208208
209209 // Enqueue any other immediate events.
210210 return Some ( NonZeroUsize :: new ( self . try_recv_events ( ) + 1 ) . unwrap ( ) ) ;
@@ -265,7 +265,7 @@ impl Hydroflow {
265265 true ,
266266 ) ) ;
267267 self . init_stratum ( stratum) ;
268- self . ready_queue [ stratum] . push_back ( sg_id) ;
268+ self . stratum_queues [ stratum] . push_back ( sg_id) ;
269269
270270 sg_id
271271 }
@@ -354,15 +354,16 @@ impl Hydroflow {
354354 true ,
355355 ) ) ;
356356 self . init_stratum ( stratum) ;
357- self . ready_queue [ stratum] . push_back ( sg_id) ;
357+ self . stratum_queues [ stratum] . push_back ( sg_id) ;
358358
359359 sg_id
360360 }
361361
362362 /// Makes sure stratum STRATUM is initialized.
363363 fn init_stratum ( & mut self , stratum : usize ) {
364- if self . ready_queue . len ( ) <= stratum {
365- self . ready_queue . resize_with ( stratum + 1 , Default :: default) ;
364+ if self . stratum_queues . len ( ) <= stratum {
365+ self . stratum_queues
366+ . resize_with ( stratum + 1 , Default :: default) ;
366367 }
367368 }
368369
@@ -457,7 +458,7 @@ struct SubgraphData {
457458 #[ allow( dead_code) ]
458459 preds : Vec < HandoffId > ,
459460 succs : Vec < HandoffId > ,
460- /// If this subgraph is scheduled in [`Hydroflow::ready_queue `].
461+ /// If this subgraph is scheduled in [`Hydroflow::stratum_queues `].
461462 /// [`Cell`] allows modifying this field when iterating `Self::preds` or
462463 /// `Self::succs`, as all `SubgraphData` are owned by the same vec
463464 /// `Hydroflow::subgraphs`.
0 commit comments