Skip to content

Commit e6882bf

Browse files
committed
Broadcast VC requests in parallel (#6223)
Squashed commit of the following: commit 763a6ae Author: Michael Sproul <[email protected]> Date: Mon Aug 5 13:27:55 2024 +1000 Broadcast VC requests in parallel
1 parent 214270f commit e6882bf

File tree

1 file changed

+42
-37
lines changed

1 file changed

+42
-37
lines changed

validator_client/src/beacon_node_fallback.rs

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -599,46 +599,44 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
599599
F: Fn(&'a BeaconNodeHttpClient) -> R,
600600
R: Future<Output = Result<O, Err>>,
601601
{
602-
let mut results = vec![];
603602
let mut to_retry = vec![];
604603
let mut retry_unsynced = vec![];
605604

606605
// Run `func` using a `candidate`, returning the value or capturing errors.
607606
//
608607
// We use a macro instead of a closure here since it is not trivial to move `func` into a
609608
// closure.
610-
macro_rules! try_func {
611-
($candidate: ident) => {{
612-
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
609+
let run_on_candidate = |candidate: &'a CandidateBeaconNode<E>| async {
610+
inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]);
613611

614-
// There exists a race condition where `func` may be called when the candidate is
615-
// actually not ready. We deem this an acceptable inefficiency.
616-
match func(&$candidate.beacon_node).await {
617-
Ok(val) => results.push(Ok(val)),
618-
Err(e) => {
619-
// If we have an error on this function, make the client as not-ready.
620-
//
621-
// There exists a race condition where the candidate may have been marked
622-
// as ready between the `func` call and now. We deem this an acceptable
623-
// inefficiency.
624-
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
625-
$candidate.set_offline().await;
626-
}
627-
results.push(Err((
628-
$candidate.beacon_node.to_string(),
629-
Error::RequestFailed(e),
630-
)));
631-
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
612+
// There exists a race condition where `func` may be called when the candidate is
613+
// actually not ready. We deem this an acceptable inefficiency.
614+
match func(&candidate.beacon_node).await {
615+
Ok(val) => Ok(val),
616+
Err(e) => {
617+
// If we have an error on this function, mark the client as not-ready.
618+
//
619+
// There exists a race condition where the candidate may have been marked
620+
// as ready between the `func` call and now. We deem this an acceptable
621+
// inefficiency.
622+
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
623+
candidate.set_offline().await;
632624
}
625+
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]);
626+
Err((candidate.beacon_node.to_string(), Error::RequestFailed(e)))
633627
}
634-
}};
635-
}
628+
}
629+
};
636630

637631
// First pass: try `func` on all synced and ready candidates.
638632
//
639633
// This ensures that we always choose a synced node if it is available.
634+
let mut first_batch_futures = vec![];
640635
for candidate in &self.candidates {
641636
match candidate.status(RequireSynced::Yes).await {
637+
Ok(_) => {
638+
first_batch_futures.push(run_on_candidate(candidate));
639+
}
642640
Err(CandidateError::NotSynced) if require_synced == false => {
643641
// This client is unsynced we will try it after trying all synced clients
644642
retry_unsynced.push(candidate);
@@ -647,22 +645,24 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
647645
// This client was not ready on the first pass, we might try it again later.
648646
to_retry.push(candidate);
649647
}
650-
Ok(_) => try_func!(candidate),
651648
}
652649
}
650+
let first_batch_results = futures::future::join_all(first_batch_futures).await;
653651

654652
// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
655653
// unsynced candidates.
656654
//
657655
// Due to async race-conditions, it is possible that we will send a request to a candidate
658656
// that has been set to an offline/unready status. This is acceptable.
659-
if require_synced == false {
660-
for candidate in retry_unsynced {
661-
try_func!(candidate);
662-
}
663-
}
657+
let second_batch_results = if require_synced == false {
658+
futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await
659+
} else {
660+
vec![]
661+
};
664662

665663
// Third pass: try again, attempting to make non-ready clients become ready.
664+
let mut third_batch_futures = vec![];
665+
let mut third_batch_results = vec![];
666666
for candidate in to_retry {
667667
// If the candidate hasn't luckily transferred into the correct state in the meantime,
668668
// force an update of the state.
@@ -676,16 +676,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
676676
};
677677

678678
match new_status {
679-
Ok(()) => try_func!(candidate),
680-
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
681-
Err(e) => {
682-
results.push(Err((
683-
candidate.beacon_node.to_string(),
684-
Error::Unavailable(e),
685-
)));
679+
Ok(()) => third_batch_futures.push(run_on_candidate(candidate)),
680+
Err(CandidateError::NotSynced) if require_synced == false => {
681+
third_batch_futures.push(run_on_candidate(candidate))
686682
}
683+
Err(e) => third_batch_results.push(Err((
684+
candidate.beacon_node.to_string(),
685+
Error::Unavailable(e),
686+
))),
687687
}
688688
}
689+
third_batch_results.extend(futures::future::join_all(third_batch_futures).await);
690+
691+
let mut results = first_batch_results;
692+
results.extend(second_batch_results);
693+
results.extend(third_batch_results);
689694

690695
let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();
691696

0 commit comments

Comments
 (0)