Skip to content

Commit e3c3142

Browse files
committed
Broadcast VC requests in parallel (#6223)
Squashed commit of the following: commit f0b4a0a Author: Michael Sproul <[email protected]> Date: Tue Aug 6 11:58:56 2024 +1000 Try some things commit e2f1875 Author: Michael Sproul <[email protected]> Date: Tue Aug 6 11:50:35 2024 +1000 Remove outdated comment commit 763a6ae Author: Michael Sproul <[email protected]> Date: Mon Aug 5 13:27:55 2024 +1000 Broadcast VC requests in parallel
1 parent ca6ba2e commit e3c3142

File tree

3 files changed

+68
-41
lines changed

3 files changed

+68
-41
lines changed

common/eth2/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ impl fmt::Display for Error {
121121
pub struct Timeouts {
122122
pub attestation: Duration,
123123
pub attester_duties: Duration,
124+
pub attestation_subscriptions: Duration,
124125
pub liveness: Duration,
125126
pub proposal: Duration,
126127
pub proposer_duties: Duration,
@@ -137,6 +138,7 @@ impl Timeouts {
137138
Timeouts {
138139
attestation: timeout,
139140
attester_duties: timeout,
141+
attestation_subscriptions: timeout,
140142
liveness: timeout,
141143
proposal: timeout,
142144
proposer_duties: timeout,
@@ -2515,7 +2517,12 @@ impl BeaconNodeHttpClient {
25152517
.push("validator")
25162518
.push("beacon_committee_subscriptions");
25172519

2518-
self.post(path, &subscriptions).await?;
2520+
self.post_with_timeout(
2521+
path,
2522+
&subscriptions,
2523+
self.timeouts.attestation_subscriptions,
2524+
)
2525+
.await?;
25192526

25202527
Ok(())
25212528
}

validator_client/src/beacon_node_fallback.rs

Lines changed: 57 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,11 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
184184
spec: &ChainSpec,
185185
log: &Logger,
186186
) -> Result<(), CandidateError> {
187+
debug!(
188+
log,
189+
"Refresh started";
190+
"endpoint" => %self.beacon_node,
191+
);
187192
let previous_status = self.status(RequireSynced::Yes).await;
188193
let was_offline = matches!(previous_status, Err(CandidateError::Offline));
189194

@@ -203,6 +208,12 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
203208
// holding a write-lock whilst we check the online status of the node.
204209
*self.status.write().await = new_status;
205210

211+
debug!(
212+
log,
213+
"Refresh complete";
214+
"endpoint" => %self.beacon_node,
215+
);
216+
206217
new_status
207218
}
208219

@@ -599,46 +610,42 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
599610
F: Fn(&'a BeaconNodeHttpClient) -> R,
600611
R: Future<Output = Result<O, Err>>,
601612
{
602-
let mut results = vec![];
603613
let mut to_retry = vec![];
604614
let mut retry_unsynced = vec![];
605615

606616
// Run `func` using a `candidate`, returning the value or capturing errors.
607-
//
608-
// We use a macro instead of a closure here since it is not trivial to move `func` into a
609-
// closure.
610-
macro_rules! try_func {
611-
($candidate: ident) => {{
612-
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
617+
let run_on_candidate = |candidate: &'a CandidateBeaconNode<E>| async {
618+
inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]);
613619

614-
// There exists a race condition where `func` may be called when the candidate is
615-
// actually not ready. We deem this an acceptable inefficiency.
616-
match func(&$candidate.beacon_node).await {
617-
Ok(val) => results.push(Ok(val)),
618-
Err(e) => {
619-
// If we have an error on this function, make the client as not-ready.
620-
//
621-
// There exists a race condition where the candidate may have been marked
622-
// as ready between the `func` call and now. We deem this an acceptable
623-
// inefficiency.
624-
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
625-
$candidate.set_offline().await;
626-
}
627-
results.push(Err((
628-
$candidate.beacon_node.to_string(),
629-
Error::RequestFailed(e),
630-
)));
631-
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
620+
// There exists a race condition where `func` may be called when the candidate is
621+
// actually not ready. We deem this an acceptable inefficiency.
622+
match func(&candidate.beacon_node).await {
623+
Ok(val) => Ok(val),
624+
Err(e) => {
625+
// If we have an error on this function, mark the client as not-ready.
626+
//
627+
// There exists a race condition where the candidate may have been marked
628+
// as ready between the `func` call and now. We deem this an acceptable
629+
// inefficiency.
630+
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
631+
candidate.set_offline().await;
632632
}
633+
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]);
634+
Err((candidate.beacon_node.to_string(), Error::RequestFailed(e)))
633635
}
634-
}};
635-
}
636+
}
637+
};
636638

637639
// First pass: try `func` on all synced and ready candidates.
638640
//
639641
// This ensures that we always choose a synced node if it is available.
642+
debug!(self.log, "Starting first batch of subscription requests");
643+
let mut first_batch_futures = vec![];
640644
for candidate in &self.candidates {
641645
match candidate.status(RequireSynced::Yes).await {
646+
Ok(_) => {
647+
first_batch_futures.push(run_on_candidate(candidate));
648+
}
642649
Err(CandidateError::NotSynced) if require_synced == false => {
643650
// This client is unsynced we will try it after trying all synced clients
644651
retry_unsynced.push(candidate);
@@ -647,22 +654,26 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
647654
// This client was not ready on the first pass, we might try it again later.
648655
to_retry.push(candidate);
649656
}
650-
Ok(_) => try_func!(candidate),
651657
}
652658
}
659+
let first_batch_results = futures::future::join_all(first_batch_futures).await;
653660

654661
// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
655662
// unsynced candidates.
656663
//
657664
// Due to async race-conditions, it is possible that we will send a request to a candidate
658665
// that has been set to an offline/unready status. This is acceptable.
659-
if require_synced == false {
660-
for candidate in retry_unsynced {
661-
try_func!(candidate);
662-
}
663-
}
666+
debug!(self.log, "Starting second batch of subscription requests");
667+
let second_batch_results = if require_synced == false {
668+
futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await
669+
} else {
670+
vec![]
671+
};
664672

665673
// Third pass: try again, attempting to make non-ready clients become ready.
674+
debug!(self.log, "Starting third batch of subscription requests");
675+
let mut third_batch_futures = vec![];
676+
let mut third_batch_results = vec![];
666677
for candidate in to_retry {
667678
// If the candidate hasn't luckily transferred into the correct state in the meantime,
668679
// force an update of the state.
@@ -676,16 +687,22 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
676687
};
677688

678689
match new_status {
679-
Ok(()) => try_func!(candidate),
680-
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
681-
Err(e) => {
682-
results.push(Err((
683-
candidate.beacon_node.to_string(),
684-
Error::Unavailable(e),
685-
)));
690+
Ok(()) => third_batch_futures.push(run_on_candidate(candidate)),
691+
Err(CandidateError::NotSynced) if require_synced == false => {
692+
third_batch_futures.push(run_on_candidate(candidate))
686693
}
694+
Err(e) => third_batch_results.push(Err((
695+
candidate.beacon_node.to_string(),
696+
Error::Unavailable(e),
697+
))),
687698
}
688699
}
700+
third_batch_results.extend(futures::future::join_all(third_batch_futures).await);
701+
debug!(self.log, "Completed all subscription requests");
702+
703+
let mut results = first_batch_results;
704+
results.extend(second_batch_results);
705+
results.extend(third_batch_results);
689706

690707
let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();
691708

validator_client/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
7575
/// This can help ensure that proper endpoint fallback occurs.
7676
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
7777
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
78+
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
7879
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
7980
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
8081
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
@@ -323,6 +324,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
323324
Timeouts {
324325
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
325326
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
327+
attestation_subscriptions: slot_duration
328+
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
326329
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
327330
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
328331
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,

0 commit comments

Comments
 (0)