From ba92311361d5cead4ccd99be74f709881f114127 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 9 Dec 2025 14:20:35 -0300 Subject: [PATCH 1/5] Implement consensus mechanism --- consensus/types/src/slot_epoch.rs | 4 + .../beacon_node_fallback/src/lib.rs | 43 ++++++ .../src/attestation_data_service.rs | 123 ++++++++++++++++-- .../src/attestation_service.rs | 98 ++++++++++++-- 4 files changed, 249 insertions(+), 19 deletions(-) diff --git a/consensus/types/src/slot_epoch.rs b/consensus/types/src/slot_epoch.rs index 05af9c5232d..2673d9d01e6 100644 --- a/consensus/types/src/slot_epoch.rs +++ b/consensus/types/src/slot_epoch.rs @@ -57,6 +57,10 @@ impl Slot { pub fn max_value() -> Slot { Slot(u64::MAX) } + + pub fn is_start_slot_in_epoch(&self, slots_per_epoch: u64) -> bool { + self.0.is_multiple_of(slots_per_epoch) + } } impl Epoch { diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 5f8c8a04b7e..afdc1b8b425 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -652,6 +652,49 @@ impl BeaconNodeFallback { .map(|(val, _)| val) } + pub async fn first_n_responses( + &self, + fetch_func: F, + mut consensus_check: C, + ) -> Result<(O, usize), Errors> + where + F: Fn(BeaconNodeHttpClient) -> R, + R: Future>, + C: FnMut(&(O, usize)) -> bool, + O: Eq + Clone + Debug, + Err: Debug, + { + let mut errors = vec![]; + + // Collect all responses from all candidates + let candidates = self.candidates.read().await; + let mut futures = vec![]; + + for candidate in candidates.iter() { + futures.push(Self::run_on_candidate( + candidate.beacon_node.clone(), + &fetch_func, + )); + } + drop(candidates); + + // Process futures sequentially, checking consensus after each response + for future in futures { + match future.await { + Ok(val) => { + if consensus_check(&val) { + return Ok(val); + } + } + Err(e) => { + errors.push(e); + } + } + } + + Err(Errors(errors)) + } + /// Run `func` against each candidate in `self`, returning immediately if a result is found. /// Otherwise, return all the errors encountered along the way. pub async fn first_success_with_index( diff --git a/validator_client/validator_services/src/attestation_data_service.rs b/validator_client/validator_services/src/attestation_data_service.rs index ab8e5dfad25..ee8078afa4c 100644 --- a/validator_client/validator_services/src/attestation_data_service.rs +++ b/validator_client/validator_services/src/attestation_data_service.rs @@ -1,9 +1,32 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use beacon_node_fallback::BeaconNodeFallback; use slot_clock::SlotClock; use tracing::{Instrument, info_span}; -use types::{AttestationData, Slot}; +use types::{AttestationData, Checkpoint, Epoch, Slot}; + +#[derive(Debug, Clone)] +pub enum AttestationDataStrategy { + Fallback, + ByIndex(usize), + Consensus((usize, Option<(Checkpoint, usize)>)), + IgnoreEpoch(Epoch), +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct FFGConsensus { + pub source: Checkpoint, + pub target: Checkpoint, +} + +impl FFGConsensus { + pub fn new(attestation_data: &AttestationData) -> Self { + FFGConsensus { + source: attestation_data.source, + target: attestation_data.target, + } + } +} /// The AttestationDataService is responsible for downloading and caching attestation data at a given slot. /// It also helps prevent us from re-downloading identical attestation data. @@ -16,13 +39,12 @@ impl AttestationDataService { Self { beacon_nodes } } - pub async fn download_data( + async fn data_by_index( &self, request_slot: &Slot, candidate_beacon_node: Option, ) -> Result<(AttestationData, usize), String> { - let (attestation_data, node_index) = self - .beacon_nodes + self.beacon_nodes .first_success_from_index(candidate_beacon_node, |beacon_node| async move { let _timer = validator_metrics::start_timer_vec( &validator_metrics::ATTESTATION_SERVICE_TIMES, @@ -36,9 +58,82 @@ impl AttestationDataService { }) .instrument(info_span!("fetch_attestation_data")) .await - .map_err(|e| e.to_string())?; + .map_err(|e| e.to_string()) + } + + pub async fn data_by_threshold( + &self, + request_slot: &Slot, + threshold: usize, + checkpoint_and_index: Option<(Checkpoint, usize)>, + ) -> Result<(AttestationData, usize), String> { + let mut results = HashMap::new(); + + self.beacon_nodes + .first_n_responses( + |beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(*request_slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }, + |(attestation_data, index)| { + if let Some((target_checkpoint, preferred_index)) = checkpoint_and_index { + // If we have a preferred index set, return attestation data from it + // TODO(attestation-consensus) this is a small optimization to immediately return data + // from the preferred index. We shouldn't need to check the target checkpoint, but maybe + // its just safer to do so? + if preferred_index == *index { + return true; + } + // return if fetched data matches the target checkpoint + return attestation_data.target == target_checkpoint; + } + + let ffg_consensus = FFGConsensus::new(attestation_data); + results + .entry(ffg_consensus.clone()) + .or_insert_with(Vec::new) + .push(*index); + if results + .get(&ffg_consensus) + .is_some_and(|servers| servers.len() >= threshold) + { + return true; + } + + false + }, + ) + .instrument(info_span!("fetch_attestation_data")) + .await + .map_err(|e| e.to_string()) + } - Ok((attestation_data, node_index)) + pub async fn download_data( + &self, + request_slot: &Slot, + strategy: &AttestationDataStrategy, + ) -> Result<(AttestationData, usize), String> { + match strategy { + AttestationDataStrategy::Fallback => self.data_by_index(request_slot, None).await, + AttestationDataStrategy::ByIndex(index) => { + self.data_by_index(request_slot, Some(*index)).await + } + AttestationDataStrategy::Consensus((threshold, checkpoint_and_index)) => { + self.data_by_threshold(request_slot, *threshold, checkpoint_and_index.clone()) + .await + } + AttestationDataStrategy::IgnoreEpoch(epoch) => Err(format!( + "Disabled attestation production for epoch {:?}", + epoch + )), + } } } @@ -54,7 +149,7 @@ mod tests { MinimalEthSpec, Slot, }; - use crate::attestation_data_service::AttestationDataService; + use crate::attestation_data_service::{AttestationDataService, AttestationDataStrategy}; fn create_attestation_data( slot: Slot, @@ -156,7 +251,9 @@ mod tests { )); let service = AttestationDataService::::new(Arc::new(fallback)); - let result = service.download_data(&slot, None).await; + let result = service + .download_data(&slot, &AttestationDataStrategy::Fallback) + .await; // Verify download is successful assert!(result.is_ok()); @@ -186,7 +283,9 @@ mod tests { )); let service = AttestationDataService::::new(Arc::new(fallback)); - let result = service.download_data(&slot, None).await; + let result = service + .download_data(&slot, &AttestationDataStrategy::Fallback) + .await; // Verify all nodes offline assert!(result.is_err()); @@ -224,7 +323,9 @@ mod tests { )); let service = AttestationDataService::::new(Arc::new(fallback)); - let result = service.download_data(&slot, None).await; + let result = service + .download_data(&slot, &AttestationDataStrategy::Fallback) + .await; // Verify download is successful and we fell back to the next node assert!(result.is_ok()); diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 19223386fbe..55cd3e01b84 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,7 +1,7 @@ use crate::duties_service::{DutiesService, DutyAndProof}; use tokio::sync::Mutex; -use crate::attestation_data_service::AttestationDataService; +use crate::attestation_data_service::{AttestationDataService, AttestationDataStrategy}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, beacon_head_monitor::HeadEvent}; use futures::future::join_all; use logging::crit; @@ -14,7 +14,9 @@ use tokio::sync::mpsc; use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, Span, debug, error, info, info_span, instrument, warn}; use tree_hash::TreeHash; -use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; +use types::{ + Attestation, AttestationData, ChainSpec, Checkpoint, CommitteeIndex, Epoch, EthSpec, Slot, +}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; /// Builds an `AttestationService`. @@ -28,6 +30,8 @@ pub struct AttestationServiceBuilder chain_spec: Option>, head_monitor_rx: Option>>>, attestation_data_service: Option>>, + latest_target_checkpoint: Arc>>, + consensus_threshold: Option, disable: bool, } @@ -42,6 +46,8 @@ impl AttestationServiceBuil chain_spec: None, head_monitor_rx: None, attestation_data_service: None, + latest_target_checkpoint: Arc::new(Mutex::new(None)), + consensus_threshold: None, disable: false, } } @@ -83,6 +89,11 @@ impl AttestationServiceBuil self } + pub fn consensus_threshold(mut self, threshold: usize) -> Self { + self.consensus_threshold = Some(threshold); + self + } + pub fn head_monitor_rx( mut self, head_monitor_rx: Option>>>, @@ -115,6 +126,8 @@ impl AttestationServiceBuil .attestation_data_service .ok_or("Cannot build AttestationService without attestation_data_service")?, head_monitor_rx: self.head_monitor_rx, + latest_target_checkpoint: self.latest_target_checkpoint, + consensus_threshold: self.consensus_threshold, disable: self.disable, latest_attested_slot: Mutex::new(Slot::default()), }), @@ -132,6 +145,8 @@ pub struct Inner { chain_spec: Arc, head_monitor_rx: Option>>>, attestation_data_service: Arc>, + latest_target_checkpoint: Arc>>, + consensus_threshold: Option, disable: bool, latest_attested_slot: Mutex, } @@ -170,6 +185,13 @@ impl AttestationService AttestationService AttestationService *last_slot = current_slot, Err(e) => { crit!(error = e, "Failed to spawn attestation tasks") @@ -247,7 +274,7 @@ impl AttestationService, + attestation_data_strategy: AttestationDataStrategy, ) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let duration_to_next_slot = self @@ -265,11 +292,66 @@ impl AttestationService Date: Tue, 9 Dec 2025 14:26:18 -0300 Subject: [PATCH 2/5] Set default threshold to 3, just for testing --- validator_client/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 4ef560c4049..df9f7dcdeec 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -516,6 +516,7 @@ impl ProductionValidatorClient { .beacon_nodes(beacon_nodes.clone()) .executor(context.executor.clone()) .head_monitor_rx(head_monitor_rx) + .consensus_threshold(3) .chain_spec(context.eth2_config.spec.clone()) .disable(config.disable_attesting); From eb07e8f6c6ed8e31702412a2ec5d42ca8bf38226 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 9 Dec 2025 15:21:01 -0300 Subject: [PATCH 3/5] Fix --- .../validator_services/src/attestation_service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 55cd3e01b84..8935d1d5e12 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -309,7 +309,7 @@ impl AttestationService AttestationService Date: Wed, 10 Dec 2025 12:13:40 -0300 Subject: [PATCH 4/5] Add score aggregation --- .../src/attestation_data_service.rs | 170 +++++++++++++----- .../src/attestation_service.rs | 3 +- 2 files changed, 128 insertions(+), 45 deletions(-) diff --git a/validator_client/validator_services/src/attestation_data_service.rs b/validator_client/validator_services/src/attestation_data_service.rs index ee8078afa4c..f8360383cc0 100644 --- a/validator_client/validator_services/src/attestation_data_service.rs +++ b/validator_client/validator_services/src/attestation_data_service.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use beacon_node_fallback::BeaconNodeFallback; +use safe_arith::SafeArith; use slot_clock::SlotClock; use tracing::{Instrument, info_span}; use types::{AttestationData, Checkpoint, Epoch, Slot}; @@ -10,22 +11,123 @@ pub enum AttestationDataStrategy { Fallback, ByIndex(usize), Consensus((usize, Option<(Checkpoint, usize)>)), + HighestScore, IgnoreEpoch(Epoch), } -#[derive(Debug, PartialEq, Eq, Hash, Clone)] -pub struct FFGConsensus { - pub source: Checkpoint, - pub target: Checkpoint, +// New trait for aggregation strategies that need parallel queries +trait ResultAggregator { + /// Process a single response and decide whether to continue or stop + fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool; // Returns true if we should stop and return + + /// Get the final result after aggregation + fn get_result(&self) -> Option<(AttestationData, usize)>; +} + +// Consensus aggregator +struct ConsensusAggregator { + results: HashMap>, + threshold: usize, + target_checkpoint_and_index: Option<(Checkpoint, usize)>, + consensus_result: Option<(AttestationData, usize)>, +} + +impl ConsensusAggregator { + fn new(threshold: usize, target_checkpoint_and_index: Option<(Checkpoint, usize)>) -> Self { + Self { + results: HashMap::new(), + threshold, + target_checkpoint_and_index, + consensus_result: None, + } + } +} + +impl ResultAggregator for ConsensusAggregator { + fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool { + if let Some((target_checkpoint, preferred_index)) = self.target_checkpoint_and_index { + // If we have a preferred index set, return attestation data from it + // TODO(attestation-consensus) this is a small optimization to immediately return data + // from the preferred index. We shouldn't need to check the target checkpoint, but maybe + // its just safer to do so? + if preferred_index == index { + self.consensus_result = Some((attestation_data.clone(), index)); + return true; + } + // return if fetched data matches the target checkpoint + if attestation_data.target == target_checkpoint { + self.consensus_result = Some((attestation_data.clone(), index)); + return true; + } + } + self.results + .entry(attestation_data.target) + .or_insert_with(Vec::new) + .push(index); + + if self + .results + .get(&attestation_data.target) + .is_some_and(|servers| servers.len() >= self.threshold) + { + // Consensus has been reached + self.consensus_result = Some((attestation_data.clone(), index)); + return true; + } + + false + } + + fn get_result(&self) -> Option<(AttestationData, usize)> { + self.consensus_result.clone() + } +} + +// Score aggregator +struct ScoreAggregator { + results: HashMap, + // TODO im pretty sure the head slot is just the requested slot + // double check the attestation service before deleting this TODO. + head_slot: Slot, + responses_needed: usize, + responses_received: usize, } -impl FFGConsensus { - pub fn new(attestation_data: &AttestationData) -> Self { - FFGConsensus { - source: attestation_data.source, - target: attestation_data.target, +impl ScoreAggregator { + fn new(head_slot: Slot, responses_needed: usize) -> Self { + Self { + results: HashMap::new(), + head_slot, + responses_needed, + responses_received: 0, } } + + fn calculate_score(&self, attestation_data: &AttestationData) -> u64 { + let checkpoint_value = attestation_data.source.epoch + attestation_data.target.epoch; + let slot_value = 1 + attestation_data.slot.as_u64() - self.head_slot.as_u64(); + // TODO unwrap + checkpoint_value.as_u64() + 1.safe_div(slot_value).unwrap() + } +} + +impl ResultAggregator for ScoreAggregator { + fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool { + let score = self.calculate_score(attestation_data); + self.results + .insert(index, (score, attestation_data.clone())); + self.responses_received += 1; + + // Stop when we've received enough responses + self.responses_received >= self.responses_needed + } + + fn get_result(&self) -> Option<(AttestationData, usize)> { + self.results + .iter() + .max_by_key(|(_, (score, _))| score) + .map(|(idx, (_, data))| (data.clone(), *idx)) + } } /// The AttestationDataService is responsible for downloading and caching attestation data at a given slot. @@ -61,14 +163,11 @@ impl AttestationDataService { .map_err(|e| e.to_string()) } - pub async fn data_by_threshold( + async fn data_with_aggregation( &self, request_slot: &Slot, - threshold: usize, - checkpoint_and_index: Option<(Checkpoint, usize)>, + mut aggregator: impl ResultAggregator, ) -> Result<(AttestationData, usize), String> { - let mut results = HashMap::new(); - self.beacon_nodes .first_n_responses( |beacon_node| async move { @@ -82,37 +181,15 @@ impl AttestationDataService { .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) .map(|result| result.data) }, - |(attestation_data, index)| { - if let Some((target_checkpoint, preferred_index)) = checkpoint_and_index { - // If we have a preferred index set, return attestation data from it - // TODO(attestation-consensus) this is a small optimization to immediately return data - // from the preferred index. We shouldn't need to check the target checkpoint, but maybe - // its just safer to do so? - if preferred_index == *index { - return true; - } - // return if fetched data matches the target checkpoint - return attestation_data.target == target_checkpoint; - } - - let ffg_consensus = FFGConsensus::new(attestation_data); - results - .entry(ffg_consensus.clone()) - .or_insert_with(Vec::new) - .push(*index); - if results - .get(&ffg_consensus) - .is_some_and(|servers| servers.len() >= threshold) - { - return true; - } - - false - }, + |(attestation_data, index)| aggregator.process_result(attestation_data, *index), ) .instrument(info_span!("fetch_attestation_data")) .await - .map_err(|e| e.to_string()) + .map_err(|e| e.to_string())?; + + aggregator + .get_result() + .ok_or_else(|| "No valid attestation data found".to_string()) } pub async fn download_data( @@ -126,13 +203,20 @@ impl AttestationDataService { self.data_by_index(request_slot, Some(*index)).await } AttestationDataStrategy::Consensus((threshold, checkpoint_and_index)) => { - self.data_by_threshold(request_slot, *threshold, checkpoint_and_index.clone()) + let consensus_aggregator = + ConsensusAggregator::new(*threshold, checkpoint_and_index.clone()); + self.data_with_aggregation(request_slot, consensus_aggregator) .await } AttestationDataStrategy::IgnoreEpoch(epoch) => Err(format!( "Disabled attestation production for epoch {:?}", epoch )), + AttestationDataStrategy::HighestScore => { + let aggregator = + ScoreAggregator::new(*request_slot, self.beacon_nodes.num_total().await); + self.data_with_aggregation(request_slot, aggregator).await + } } } } diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 8935d1d5e12..ab83f7e7a9b 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -344,8 +344,7 @@ impl AttestationService Date: Wed, 10 Dec 2025 16:12:01 -0300 Subject: [PATCH 5/5] fix --- .../src/attestation_service.rs | 76 +++++++++---------- 1 file changed, 34 insertions(+), 42 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index ab83f7e7a9b..0d2c5772358 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -287,53 +287,45 @@ impl AttestationService = self.duties_service.attesters(slot).into_iter().collect(); let attestation_service = self.clone(); + // If we're using the Consensus strategy we need to handle the following situations: + // - The first slot in the epoch + // - A slot thats within an attestable epoch + // - A slot thats not within an attestable epoch + // - A slot that is not the first in an epoch and no target checkpoint to compare + let attestation_data_strategy = + if let AttestationDataStrategy::Consensus((threshold, _)) = attestation_data_strategy { + if slot.is_start_slot_in_epoch(S::E::slots_per_epoch()) { + // if the current slot is the first slot in the epoch use the default consensus strategy + attestation_data_strategy + } else if let Some((latest_attestable_epoch, target_checkpoint, preferred_index)) = + *attestation_service.latest_target_checkpoint.blocking_lock() + { + if slot.epoch(S::E::slots_per_epoch()) == latest_attestable_epoch { + // If the current slot is within the latest attestable epoch, we can attest + // using the `preferred_index` or nodes that have a matching `target_checkpoint` + AttestationDataStrategy::Consensus(( + threshold, + Some((target_checkpoint, preferred_index)), + )) + } else { + // If the current slot is not within the latest attestable epoch, we cannot attest + AttestationDataStrategy::IgnoreEpoch(slot.epoch(S::E::slots_per_epoch())) + } + } else { + // If the current slot is not the first slot in an epoch and there is no target checkpoint to compare, + // run the default consensus strategy. This can happen if the attestation service was initially + // launched in the middle of an epoch. + attestation_data_strategy + } + } else { + attestation_data_strategy + }; + let attestation_data_handle = self .inner .executor .spawn_handle( async move { - // If we're using the Consensus strategy we need to handle the following situations: - // - The first slot in the epoch - // - A slot thats within an attestable epoch - // - A slot thats not within an attestable epoch - // - A slot that is not the first in an epoch and no target checkpoint to compare - let attestation_data_strategy = - if let AttestationDataStrategy::Consensus((threshold, _)) = - attestation_data_strategy - { - if slot.is_start_slot_in_epoch(S::E::slots_per_epoch()) { - // if the current slot is the first slot in the epoch use the default consensus strategy - attestation_data_strategy - } else if let Some(( - latest_attestable_epoch, - target_checkpoint, - preferred_index, - )) = - *attestation_service.latest_target_checkpoint.lock().await - { - if slot.epoch(S::E::slots_per_epoch()) == latest_attestable_epoch { - // If the current slot is within the latest attestable epoch, we can attest - // using the `preferred_index` or nodes that have a matching `target_checkpoint` - AttestationDataStrategy::Consensus(( - threshold, - Some((target_checkpoint, preferred_index)), - )) - } else { - // If the current slot is not within the latest attestable epoch, we cannot attest - AttestationDataStrategy::IgnoreEpoch( - slot.epoch(S::E::slots_per_epoch()), - ) - } - } else { - // If the current slot is not the first slot in an epoch and there is no target checkpoint to compare, - // run the default consensus strategy. This can happen if the attestation service was initially - // launched in the middle of an epoch. - attestation_data_strategy - } - } else { - attestation_data_strategy - }; - let (attestation_data, index) = attestation_service .attestation_data_service .download_data(&slot, &attestation_data_strategy)