Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl fmt::Display for Error {
pub struct Timeouts {
pub attestation: Duration,
pub attester_duties: Duration,
pub attestation_subscriptions: Duration,
pub liveness: Duration,
pub proposal: Duration,
pub proposer_duties: Duration,
Expand All @@ -137,6 +138,7 @@ impl Timeouts {
Timeouts {
attestation: timeout,
attester_duties: timeout,
attestation_subscriptions: timeout,
liveness: timeout,
proposal: timeout,
proposer_duties: timeout,
Expand Down Expand Up @@ -2515,7 +2517,12 @@ impl BeaconNodeHttpClient {
.push("validator")
.push("beacon_committee_subscriptions");

self.post(path, &subscriptions).await?;
self.post_with_timeout(
path,
&subscriptions,
self.timeouts.attestation_subscriptions,
)
.await?;

Ok(())
}
Expand Down
88 changes: 48 additions & 40 deletions validator_client/src/beacon_node_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ impl<T: Debug> fmt::Display for Errors<T> {
}
}

impl<T> Errors<T> {
pub fn num_errors(&self) -> usize {
self.0.len()
}
}

/// Reasons why a candidate might not be ready.
#[derive(Debug, Clone, Copy)]
pub enum CandidateError {
Expand Down Expand Up @@ -599,46 +605,41 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
F: Fn(&'a BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
{
let mut results = vec![];
let mut to_retry = vec![];
let mut retry_unsynced = vec![];

// Run `func` using a `candidate`, returning the value or capturing errors.
//
// We use a macro instead of a closure here since it is not trivial to move `func` into a
// closure.
macro_rules! try_func {
($candidate: ident) => {{
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
let run_on_candidate = |candidate: &'a CandidateBeaconNode<E>| async {
inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]);

// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&$candidate.beacon_node).await {
Ok(val) => results.push(Ok(val)),
Err(e) => {
// If we have an error on this function, make the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
$candidate.set_offline().await;
}
results.push(Err((
$candidate.beacon_node.to_string(),
Error::RequestFailed(e),
)));
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&candidate.beacon_node).await {
Ok(val) => Ok(val),
Err(e) => {
// If we have an error on this function, mark the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
candidate.set_offline().await;
}
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]);
Err((candidate.beacon_node.to_string(), Error::RequestFailed(e)))
}
}};
}
}
};

// First pass: try `func` on all synced and ready candidates.
//
// This ensures that we always choose a synced node if it is available.
let mut first_batch_futures = vec![];
for candidate in &self.candidates {
match candidate.status(RequireSynced::Yes).await {
Ok(_) => {
first_batch_futures.push(run_on_candidate(candidate));
}
Err(CandidateError::NotSynced) if require_synced == false => {
// This client is unsynced we will try it after trying all synced clients
retry_unsynced.push(candidate);
Expand All @@ -647,22 +648,24 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
// This client was not ready on the first pass, we might try it again later.
to_retry.push(candidate);
}
Ok(_) => try_func!(candidate),
}
}
let first_batch_results = futures::future::join_all(first_batch_futures).await;

// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
// unsynced candidates.
//
// Due to async race-conditions, it is possible that we will send a request to a candidate
// that has been set to an offline/unready status. This is acceptable.
if require_synced == false {
for candidate in retry_unsynced {
try_func!(candidate);
}
}
let second_batch_results = if require_synced == false {
futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await
} else {
vec![]
};

// Third pass: try again, attempting to make non-ready clients become ready.
let mut third_batch_futures = vec![];
let mut third_batch_results = vec![];
for candidate in to_retry {
// If the candidate hasn't luckily transferred into the correct state in the meantime,
// force an update of the state.
Expand All @@ -676,16 +679,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
};

match new_status {
Ok(()) => try_func!(candidate),
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
Err(e) => {
results.push(Err((
candidate.beacon_node.to_string(),
Error::Unavailable(e),
)));
Ok(()) => third_batch_futures.push(run_on_candidate(candidate)),
Err(CandidateError::NotSynced) if require_synced == false => {
third_batch_futures.push(run_on_candidate(candidate))
}
Err(e) => third_batch_results.push(Err((
candidate.beacon_node.to_string(),
Error::Unavailable(e),
))),
}
}
third_batch_results.extend(futures::future::join_all(third_batch_futures).await);

let mut results = first_batch_results;
results.extend(second_batch_results);
results.extend(third_batch_results);

let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();

Expand Down
53 changes: 35 additions & 18 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ const _: () = assert!({
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2);
const MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD: u64 = 2;
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD);

// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
#[derive(Debug)]
Expand Down Expand Up @@ -121,6 +122,8 @@ pub struct DutyAndProof {
pub struct SubscriptionSlots {
/// Pairs of `(slot, already_sent)` in slot-descending order.
slots: Vec<(Slot, AtomicBool)>,
/// The slot of the duty itself.
duty_slot: Slot,
}

/// Create a selection proof for `duty`.
Expand Down Expand Up @@ -172,18 +175,20 @@ impl SubscriptionSlots {
.filter(|scheduled_slot| *scheduled_slot > current_slot)
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect();
Arc::new(Self { slots })
Arc::new(Self { slots, duty_slot })
}

/// Return `true` if we should send a subscription at `slot`.
fn should_send_subscription_at(&self, slot: Slot) -> bool {
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
self.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
})
slot + MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD <= self.duty_slot
&& self
.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
})
}

/// Update our record of subscribed slots to account for successful subscription at `slot`.
Expand Down Expand Up @@ -737,7 +742,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
// If there are any subscriptions, push them out to beacon nodes
if !subscriptions.is_empty() {
let subscriptions_ref = &subscriptions;
if let Err(e) = duties_service
let subscription_result = duties_service
.beacon_nodes
.request(
RequireSynced::No,
Expand All @@ -753,15 +758,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.await
},
)
.await
{
error!(
log,
"Failed to subscribe validators";
"error" => %e
)
} else {
// Record that subscriptions were successfully sent.
.await;
if subscription_result.as_ref().is_ok() {
debug!(
log,
"Broadcast attestation subscriptions";
Expand All @@ -770,6 +768,25 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
} else if let Err(e) = subscription_result {
if e.num_errors() < duties_service.beacon_nodes.num_total() {
warn!(
log,
"Some subscriptions failed";
"error" => %e,
);
// If subscriptions were sent to at least one node, regard that as a success.
// There is some redundancy built into the subscription schedule to handle failures.
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
} else {
error!(
log,
"All subscriptions failed";
"error" => %e
);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
/// This can help ensure that proper endpoint fallback occurs.
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
Expand Down Expand Up @@ -323,6 +324,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
Timeouts {
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
attestation_subscriptions: slot_duration
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
Expand Down