Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 42 additions & 37 deletions validator_client/src/beacon_node_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,46 +599,44 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
F: Fn(&'a BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
{
let mut results = vec![];
let mut to_retry = vec![];
let mut retry_unsynced = vec![];

// Run `func` using a `candidate`, returning the value or capturing errors.
//
// We use a macro instead of a closure here since it is not trivial to move `func` into a
// closure.
macro_rules! try_func {
($candidate: ident) => {{
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
let run_on_candidate = |candidate: &'a CandidateBeaconNode<E>| async {
inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]);

// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&$candidate.beacon_node).await {
Ok(val) => results.push(Ok(val)),
Err(e) => {
// If we have an error on this function, make the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
$candidate.set_offline().await;
}
results.push(Err((
$candidate.beacon_node.to_string(),
Error::RequestFailed(e),
)));
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&candidate.beacon_node).await {
Ok(val) => Ok(val),
Err(e) => {
// If we have an error on this function, mark the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
candidate.set_offline().await;
}
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]);
Err((candidate.beacon_node.to_string(), Error::RequestFailed(e)))
}
}};
}
}
};

// First pass: try `func` on all synced and ready candidates.
//
// This ensures that we always choose a synced node if it is available.
let mut first_batch_futures = vec![];
for candidate in &self.candidates {
match candidate.status(RequireSynced::Yes).await {
Ok(_) => {
first_batch_futures.push(run_on_candidate(candidate));
}
Err(CandidateError::NotSynced) if require_synced == false => {
// This client is unsynced we will try it after trying all synced clients
retry_unsynced.push(candidate);
Expand All @@ -647,22 +645,24 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
// This client was not ready on the first pass, we might try it again later.
to_retry.push(candidate);
}
Ok(_) => try_func!(candidate),
}
}
let first_batch_results = futures::future::join_all(first_batch_futures).await;

// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
// unsynced candidates.
//
// Due to async race-conditions, it is possible that we will send a request to a candidate
// that has been set to an offline/unready status. This is acceptable.
if require_synced == false {
for candidate in retry_unsynced {
try_func!(candidate);
}
}
let second_batch_results = if require_synced == false {
futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await
} else {
vec![]
};

// Third pass: try again, attempting to make non-ready clients become ready.
let mut third_batch_futures = vec![];
let mut third_batch_results = vec![];
for candidate in to_retry {
// If the candidate hasn't luckily transferred into the correct state in the meantime,
// force an update of the state.
Expand All @@ -676,16 +676,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
};

match new_status {
Ok(()) => try_func!(candidate),
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
Err(e) => {
results.push(Err((
candidate.beacon_node.to_string(),
Error::Unavailable(e),
)));
Ok(()) => third_batch_futures.push(run_on_candidate(candidate)),
Err(CandidateError::NotSynced) if require_synced == false => {
third_batch_futures.push(run_on_candidate(candidate))
}
Err(e) => third_batch_results.push(Err((
candidate.beacon_node.to_string(),
Error::Unavailable(e),
))),
}
}
third_batch_results.extend(futures::future::join_all(third_batch_futures).await);

let mut results = first_batch_results;
results.extend(second_batch_results);
results.extend(third_batch_results);

let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();

Expand Down