diff --git a/consensus/types/src/slot_epoch.rs b/consensus/types/src/slot_epoch.rs index 05af9c5232d..2673d9d01e6 100644 --- a/consensus/types/src/slot_epoch.rs +++ b/consensus/types/src/slot_epoch.rs @@ -57,6 +57,10 @@ impl Slot { pub fn max_value() -> Slot { Slot(u64::MAX) } + + pub fn is_start_slot_in_epoch(&self, slots_per_epoch: u64) -> bool { + self.0.is_multiple_of(slots_per_epoch) + } } impl Epoch { diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 5f8c8a04b7e..afdc1b8b425 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -652,6 +652,49 @@ impl BeaconNodeFallback { .map(|(val, _)| val) } + pub async fn first_n_responses( + &self, + fetch_func: F, + mut consensus_check: C, + ) -> Result<(O, usize), Errors> + where + F: Fn(BeaconNodeHttpClient) -> R, + R: Future>, + C: FnMut(&(O, usize)) -> bool, + O: Eq + Clone + Debug, + Err: Debug, + { + let mut errors = vec![]; + + // Collect all responses from all candidates + let candidates = self.candidates.read().await; + let mut futures = vec![]; + + for candidate in candidates.iter() { + futures.push(Self::run_on_candidate( + candidate.beacon_node.clone(), + &fetch_func, + )); + } + drop(candidates); + + // Process futures sequentially, checking consensus after each response + for future in futures { + match future.await { + Ok(val) => { + if consensus_check(&val) { + return Ok(val); + } + } + Err(e) => { + errors.push(e); + } + } + } + + Err(Errors(errors)) + } + /// Run `func` against each candidate in `self`, returning immediately if a result is found. /// Otherwise, return all the errors encountered along the way. pub async fn first_success_with_index( diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 4ef560c4049..df9f7dcdeec 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -516,6 +516,7 @@ impl ProductionValidatorClient { .beacon_nodes(beacon_nodes.clone()) .executor(context.executor.clone()) .head_monitor_rx(head_monitor_rx) + .consensus_threshold(3) .chain_spec(context.eth2_config.spec.clone()) .disable(config.disable_attesting); diff --git a/validator_client/validator_services/src/attestation_data_service.rs b/validator_client/validator_services/src/attestation_data_service.rs index ab8e5dfad25..f8360383cc0 100644 --- a/validator_client/validator_services/src/attestation_data_service.rs +++ b/validator_client/validator_services/src/attestation_data_service.rs @@ -1,9 +1,134 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use beacon_node_fallback::BeaconNodeFallback; +use safe_arith::SafeArith; use slot_clock::SlotClock; use tracing::{Instrument, info_span}; -use types::{AttestationData, Slot}; +use types::{AttestationData, Checkpoint, Epoch, Slot}; + +#[derive(Debug, Clone)] +pub enum AttestationDataStrategy { + Fallback, + ByIndex(usize), + Consensus((usize, Option<(Checkpoint, usize)>)), + HighestScore, + IgnoreEpoch(Epoch), +} + +// New trait for aggregation strategies that need parallel queries +trait ResultAggregator { + /// Process a single response and decide whether to continue or stop + fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool; // Returns true if we should stop and return + + /// Get the final result after aggregation + fn get_result(&self) -> Option<(AttestationData, usize)>; +} + +// Consensus aggregator +struct ConsensusAggregator { + results: HashMap>, + threshold: usize, + target_checkpoint_and_index: Option<(Checkpoint, usize)>, + consensus_result: Option<(AttestationData, usize)>, +} + +impl ConsensusAggregator { + fn new(threshold: usize, target_checkpoint_and_index: Option<(Checkpoint, usize)>) -> Self { + Self { + results: HashMap::new(), + threshold, + target_checkpoint_and_index, + consensus_result: None, + } + } +} + +impl ResultAggregator for ConsensusAggregator { + fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool { + if let Some((target_checkpoint, preferred_index)) = self.target_checkpoint_and_index { + // If we have a preferred index set, return attestation data from it + // TODO(attestation-consensus) this is a small optimization to immediately return data + // from the preferred index. We shouldn't need to check the target checkpoint, but maybe + // its just safer to do so? + if preferred_index == index { + self.consensus_result = Some((attestation_data.clone(), index)); + return true; + } + // return if fetched data matches the target checkpoint + if attestation_data.target == target_checkpoint { + self.consensus_result = Some((attestation_data.clone(), index)); + return true; + } + } + self.results + .entry(attestation_data.target) + .or_insert_with(Vec::new) + .push(index); + + if self + .results + .get(&attestation_data.target) + .is_some_and(|servers| servers.len() >= self.threshold) + { + // Consensus has been reached + self.consensus_result = Some((attestation_data.clone(), index)); + return true; + } + + false + } + + fn get_result(&self) -> Option<(AttestationData, usize)> { + self.consensus_result.clone() + } +} + +// Score aggregator +struct ScoreAggregator { + results: HashMap, + // TODO im pretty sure the head slot is just the requested slot + // double check the attestation service before deleting this TODO. + head_slot: Slot, + responses_needed: usize, + responses_received: usize, +} + +impl ScoreAggregator { + fn new(head_slot: Slot, responses_needed: usize) -> Self { + Self { + results: HashMap::new(), + head_slot, + responses_needed, + responses_received: 0, + } + } + + fn calculate_score(&self, attestation_data: &AttestationData) -> u64 { + let checkpoint_value = attestation_data.source.epoch + attestation_data.target.epoch; + let slot_value = 1 + attestation_data.slot.as_u64() - self.head_slot.as_u64(); + // TODO unwrap + checkpoint_value.as_u64() + 1.safe_div(slot_value).unwrap() + } +} + +impl ResultAggregator for ScoreAggregator { + fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool { + let score = self.calculate_score(attestation_data); + self.results + .insert(index, (score, attestation_data.clone())); + self.responses_received += 1; + + // Stop when we've received enough responses + self.responses_received >= self.responses_needed + } + + fn get_result(&self) -> Option<(AttestationData, usize)> { + self.results + .iter() + .max_by_key(|(_, (score, _))| score) + .map(|(idx, (_, data))| (data.clone(), *idx)) + } +} /// The AttestationDataService is responsible for downloading and caching attestation data at a given slot. /// It also helps prevent us from re-downloading identical attestation data. @@ -16,13 +141,12 @@ impl AttestationDataService { Self { beacon_nodes } } - pub async fn download_data( + async fn data_by_index( &self, request_slot: &Slot, candidate_beacon_node: Option, ) -> Result<(AttestationData, usize), String> { - let (attestation_data, node_index) = self - .beacon_nodes + self.beacon_nodes .first_success_from_index(candidate_beacon_node, |beacon_node| async move { let _timer = validator_metrics::start_timer_vec( &validator_metrics::ATTESTATION_SERVICE_TIMES, @@ -36,9 +160,64 @@ impl AttestationDataService { }) .instrument(info_span!("fetch_attestation_data")) .await + .map_err(|e| e.to_string()) + } + + async fn data_with_aggregation( + &self, + request_slot: &Slot, + mut aggregator: impl ResultAggregator, + ) -> Result<(AttestationData, usize), String> { + self.beacon_nodes + .first_n_responses( + |beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(*request_slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }, + |(attestation_data, index)| aggregator.process_result(attestation_data, *index), + ) + .instrument(info_span!("fetch_attestation_data")) + .await .map_err(|e| e.to_string())?; - Ok((attestation_data, node_index)) + aggregator + .get_result() + .ok_or_else(|| "No valid attestation data found".to_string()) + } + + pub async fn download_data( + &self, + request_slot: &Slot, + strategy: &AttestationDataStrategy, + ) -> Result<(AttestationData, usize), String> { + match strategy { + AttestationDataStrategy::Fallback => self.data_by_index(request_slot, None).await, + AttestationDataStrategy::ByIndex(index) => { + self.data_by_index(request_slot, Some(*index)).await + } + AttestationDataStrategy::Consensus((threshold, checkpoint_and_index)) => { + let consensus_aggregator = + ConsensusAggregator::new(*threshold, checkpoint_and_index.clone()); + self.data_with_aggregation(request_slot, consensus_aggregator) + .await + } + AttestationDataStrategy::IgnoreEpoch(epoch) => Err(format!( + "Disabled attestation production for epoch {:?}", + epoch + )), + AttestationDataStrategy::HighestScore => { + let aggregator = + ScoreAggregator::new(*request_slot, self.beacon_nodes.num_total().await); + self.data_with_aggregation(request_slot, aggregator).await + } + } } } @@ -54,7 +233,7 @@ mod tests { MinimalEthSpec, Slot, }; - use crate::attestation_data_service::AttestationDataService; + use crate::attestation_data_service::{AttestationDataService, AttestationDataStrategy}; fn create_attestation_data( slot: Slot, @@ -156,7 +335,9 @@ mod tests { )); let service = AttestationDataService::::new(Arc::new(fallback)); - let result = service.download_data(&slot, None).await; + let result = service + .download_data(&slot, &AttestationDataStrategy::Fallback) + .await; // Verify download is successful assert!(result.is_ok()); @@ -186,7 +367,9 @@ mod tests { )); let service = AttestationDataService::::new(Arc::new(fallback)); - let result = service.download_data(&slot, None).await; + let result = service + .download_data(&slot, &AttestationDataStrategy::Fallback) + .await; // Verify all nodes offline assert!(result.is_err()); @@ -224,7 +407,9 @@ mod tests { )); let service = AttestationDataService::::new(Arc::new(fallback)); - let result = service.download_data(&slot, None).await; + let result = service + .download_data(&slot, &AttestationDataStrategy::Fallback) + .await; // Verify download is successful and we fell back to the next node assert!(result.is_ok()); diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 19223386fbe..0d2c5772358 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,7 +1,7 @@ use crate::duties_service::{DutiesService, DutyAndProof}; use tokio::sync::Mutex; -use crate::attestation_data_service::AttestationDataService; +use crate::attestation_data_service::{AttestationDataService, AttestationDataStrategy}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, beacon_head_monitor::HeadEvent}; use futures::future::join_all; use logging::crit; @@ -14,7 +14,9 @@ use tokio::sync::mpsc; use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, Span, debug, error, info, info_span, instrument, warn}; use tree_hash::TreeHash; -use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; +use types::{ + Attestation, AttestationData, ChainSpec, Checkpoint, CommitteeIndex, Epoch, EthSpec, Slot, +}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; /// Builds an `AttestationService`. @@ -28,6 +30,8 @@ pub struct AttestationServiceBuilder chain_spec: Option>, head_monitor_rx: Option>>>, attestation_data_service: Option>>, + latest_target_checkpoint: Arc>>, + consensus_threshold: Option, disable: bool, } @@ -42,6 +46,8 @@ impl AttestationServiceBuil chain_spec: None, head_monitor_rx: None, attestation_data_service: None, + latest_target_checkpoint: Arc::new(Mutex::new(None)), + consensus_threshold: None, disable: false, } } @@ -83,6 +89,11 @@ impl AttestationServiceBuil self } + pub fn consensus_threshold(mut self, threshold: usize) -> Self { + self.consensus_threshold = Some(threshold); + self + } + pub fn head_monitor_rx( mut self, head_monitor_rx: Option>>>, @@ -115,6 +126,8 @@ impl AttestationServiceBuil .attestation_data_service .ok_or("Cannot build AttestationService without attestation_data_service")?, head_monitor_rx: self.head_monitor_rx, + latest_target_checkpoint: self.latest_target_checkpoint, + consensus_threshold: self.consensus_threshold, disable: self.disable, latest_attested_slot: Mutex::new(Slot::default()), }), @@ -132,6 +145,8 @@ pub struct Inner { chain_spec: Arc, head_monitor_rx: Option>>>, attestation_data_service: Arc>, + latest_target_checkpoint: Arc>>, + consensus_threshold: Option, disable: bool, latest_attested_slot: Mutex, } @@ -170,6 +185,13 @@ impl AttestationService AttestationService AttestationService *last_slot = current_slot, Err(e) => { crit!(error = e, "Failed to spawn attestation tasks") @@ -247,7 +274,7 @@ impl AttestationService, + attestation_data_strategy: AttestationDataStrategy, ) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let duration_to_next_slot = self @@ -260,16 +287,62 @@ impl AttestationService = self.duties_service.attesters(slot).into_iter().collect(); let attestation_service = self.clone(); + // If we're using the Consensus strategy we need to handle the following situations: + // - The first slot in the epoch + // - A slot thats within an attestable epoch + // - A slot thats not within an attestable epoch + // - A slot that is not the first in an epoch and no target checkpoint to compare + let attestation_data_strategy = + if let AttestationDataStrategy::Consensus((threshold, _)) = attestation_data_strategy { + if slot.is_start_slot_in_epoch(S::E::slots_per_epoch()) { + // if the current slot is the first slot in the epoch use the default consensus strategy + attestation_data_strategy + } else if let Some((latest_attestable_epoch, target_checkpoint, preferred_index)) = + *attestation_service.latest_target_checkpoint.blocking_lock() + { + if slot.epoch(S::E::slots_per_epoch()) == latest_attestable_epoch { + // If the current slot is within the latest attestable epoch, we can attest + // using the `preferred_index` or nodes that have a matching `target_checkpoint` + AttestationDataStrategy::Consensus(( + threshold, + Some((target_checkpoint, preferred_index)), + )) + } else { + // If the current slot is not within the latest attestable epoch, we cannot attest + AttestationDataStrategy::IgnoreEpoch(slot.epoch(S::E::slots_per_epoch())) + } + } else { + // If the current slot is not the first slot in an epoch and there is no target checkpoint to compare, + // run the default consensus strategy. This can happen if the attestation service was initially + // launched in the middle of an epoch. + attestation_data_strategy + } + } else { + attestation_data_strategy + }; + let attestation_data_handle = self .inner .executor .spawn_handle( async move { - let attestation_data = attestation_service + let (attestation_data, index) = attestation_service .attestation_data_service - .download_data(&slot, beacon_node_index) - .await - .map(|(data, _)| data)?; + .download_data(&slot, &attestation_data_strategy) + .await?; + + // If we're using the consensus strategy and we fetched attestation data for the first slot of an epoch + // update `latest_target_checkpoint` so we can query subsequent epochs + if let AttestationDataStrategy::Consensus(_) = attestation_data_strategy + && slot.is_start_slot_in_epoch(S::E::slots_per_epoch()) + { + let mut guard = attestation_service.latest_target_checkpoint.lock().await; + *guard = Some(( + slot.epoch(S::E::slots_per_epoch()), + attestation_data.target, + index, + )); + } attestation_service .sign_and_publish_attestations(