@@ -134,6 +134,12 @@ impl<T: Debug> fmt::Display for Errors<T> {
134134 }
135135}
136136
137+ impl < T > Errors < T > {
138+ pub fn num_errors ( & self ) -> usize {
139+ self . 0 . len ( )
140+ }
141+ }
142+
137143/// Reasons why a candidate might not be ready.
138144#[ derive( Debug , Clone , Copy ) ]
139145pub enum CandidateError {
@@ -599,46 +605,41 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
599605 F : Fn ( & ' a BeaconNodeHttpClient ) -> R ,
600606 R : Future < Output = Result < O , Err > > ,
601607 {
602- let mut results = vec ! [ ] ;
603608 let mut to_retry = vec ! [ ] ;
604609 let mut retry_unsynced = vec ! [ ] ;
605610
606611 // Run `func` using a `candidate`, returning the value or capturing errors.
607- //
608- // We use a macro instead of a closure here since it is not trivial to move `func` into a
609- // closure.
610- macro_rules! try_func {
611- ( $candidate: ident) => { {
612- inc_counter_vec( & ENDPOINT_REQUESTS , & [ $candidate. beacon_node. as_ref( ) ] ) ;
612+ let run_on_candidate = |candidate : & ' a CandidateBeaconNode < E > | async {
613+ inc_counter_vec ( & ENDPOINT_REQUESTS , & [ candidate. beacon_node . as_ref ( ) ] ) ;
613614
614- // There exists a race condition where `func` may be called when the candidate is
615- // actually not ready. We deem this an acceptable inefficiency.
616- match func( & $candidate. beacon_node) . await {
617- Ok ( val) => results. push( Ok ( val) ) ,
618- Err ( e) => {
619- // If we have an error on this function, make the client as not-ready.
620- //
621- // There exists a race condition where the candidate may have been marked
622- // as ready between the `func` call and now. We deem this an acceptable
623- // inefficiency.
624- if matches!( offline_on_failure, OfflineOnFailure :: Yes ) {
625- $candidate. set_offline( ) . await ;
626- }
627- results. push( Err ( (
628- $candidate. beacon_node. to_string( ) ,
629- Error :: RequestFailed ( e) ,
630- ) ) ) ;
631- inc_counter_vec( & ENDPOINT_ERRORS , & [ $candidate. beacon_node. as_ref( ) ] ) ;
615+ // There exists a race condition where `func` may be called when the candidate is
616+ // actually not ready. We deem this an acceptable inefficiency.
617+ match func ( & candidate. beacon_node ) . await {
618+ Ok ( val) => Ok ( val) ,
619+ Err ( e) => {
620+ // If we have an error on this function, mark the client as not-ready.
621+ //
622+ // There exists a race condition where the candidate may have been marked
623+ // as ready between the `func` call and now. We deem this an acceptable
624+ // inefficiency.
625+ if matches ! ( offline_on_failure, OfflineOnFailure :: Yes ) {
626+ candidate. set_offline ( ) . await ;
632627 }
628+ inc_counter_vec ( & ENDPOINT_ERRORS , & [ candidate. beacon_node . as_ref ( ) ] ) ;
629+ Err ( ( candidate. beacon_node . to_string ( ) , Error :: RequestFailed ( e) ) )
633630 }
634- } } ;
635- }
631+ }
632+ } ;
636633
637634 // First pass: try `func` on all synced and ready candidates.
638635 //
639636 // This ensures that we always choose a synced node if it is available.
637+ let mut first_batch_futures = vec ! [ ] ;
640638 for candidate in & self . candidates {
641639 match candidate. status ( RequireSynced :: Yes ) . await {
640+ Ok ( _) => {
641+ first_batch_futures. push ( run_on_candidate ( candidate) ) ;
642+ }
642643 Err ( CandidateError :: NotSynced ) if require_synced == false => {
643644 // This client is unsynced we will try it after trying all synced clients
644645 retry_unsynced. push ( candidate) ;
@@ -647,22 +648,24 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
647648 // This client was not ready on the first pass, we might try it again later.
648649 to_retry. push ( candidate) ;
649650 }
650- Ok ( _) => try_func ! ( candidate) ,
651651 }
652652 }
653+ let first_batch_results = futures:: future:: join_all ( first_batch_futures) . await ;
653654
654655 // Second pass: try `func` on ready unsynced candidates. This only runs if we permit
655656 // unsynced candidates.
656657 //
657658 // Due to async race-conditions, it is possible that we will send a request to a candidate
658659 // that has been set to an offline/unready status. This is acceptable.
659- if require_synced == false {
660- for candidate in retry_unsynced {
661- try_func ! ( candidate ) ;
662- }
663- }
660+ let second_batch_results = if require_synced == false {
661+ futures :: future :: join_all ( retry_unsynced. into_iter ( ) . map ( run_on_candidate ) ) . await
662+ } else {
663+ vec ! [ ]
664+ } ;
664665
665666 // Third pass: try again, attempting to make non-ready clients become ready.
667+ let mut third_batch_futures = vec ! [ ] ;
668+ let mut third_batch_results = vec ! [ ] ;
666669 for candidate in to_retry {
667670 // If the candidate hasn't luckily transferred into the correct state in the meantime,
668671 // force an update of the state.
@@ -676,16 +679,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
676679 } ;
677680
678681 match new_status {
679- Ok ( ( ) ) => try_func ! ( candidate) ,
680- Err ( CandidateError :: NotSynced ) if require_synced == false => try_func ! ( candidate) ,
681- Err ( e) => {
682- results. push ( Err ( (
683- candidate. beacon_node . to_string ( ) ,
684- Error :: Unavailable ( e) ,
685- ) ) ) ;
682+ Ok ( ( ) ) => third_batch_futures. push ( run_on_candidate ( candidate) ) ,
683+ Err ( CandidateError :: NotSynced ) if require_synced == false => {
684+ third_batch_futures. push ( run_on_candidate ( candidate) )
686685 }
686+ Err ( e) => third_batch_results. push ( Err ( (
687+ candidate. beacon_node . to_string ( ) ,
688+ Error :: Unavailable ( e) ,
689+ ) ) ) ,
687690 }
688691 }
692+ third_batch_results. extend ( futures:: future:: join_all ( third_batch_futures) . await ) ;
693+
694+ let mut results = first_batch_results;
695+ results. extend ( second_batch_results) ;
696+ results. extend ( third_batch_results) ;
689697
690698 let errors: Vec < _ > = results. into_iter ( ) . filter_map ( |res| res. err ( ) ) . collect ( ) ;
691699
0 commit comments