Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions consensus/types/src/slot_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ impl Slot {
pub fn max_value() -> Slot {
Slot(u64::MAX)
}

pub fn is_start_slot_in_epoch(&self, slots_per_epoch: u64) -> bool {
self.0.is_multiple_of(slots_per_epoch)
}
}

impl Epoch {
Expand Down
43 changes: 43 additions & 0 deletions validator_client/beacon_node_fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,49 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
.map(|(val, _)| val)
}

pub async fn first_n_responses<F, O, Err, R, C>(
&self,
fetch_func: F,
mut consensus_check: C,
) -> Result<(O, usize), Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
C: FnMut(&(O, usize)) -> bool,
O: Eq + Clone + Debug,
Err: Debug,
{
let mut errors = vec![];

// Collect all responses from all candidates
let candidates = self.candidates.read().await;
let mut futures = vec![];

for candidate in candidates.iter() {
futures.push(Self::run_on_candidate(
candidate.beacon_node.clone(),
&fetch_func,
));
}
drop(candidates);

// Process futures sequentially, checking consensus after each response
for future in futures {
match future.await {
Ok(val) => {
if consensus_check(&val) {
return Ok(val);
}
}
Err(e) => {
errors.push(e);
}
}
}

Err(Errors(errors))
}

/// Run `func` against each candidate in `self`, returning immediately if a result is found.
/// Otherwise, return all the errors encountered along the way.
pub async fn first_success_with_index<F, O, Err, R>(
Expand Down
1 change: 1 addition & 0 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.beacon_nodes(beacon_nodes.clone())
.executor(context.executor.clone())
.head_monitor_rx(head_monitor_rx)
.consensus_threshold(3)
.chain_spec(context.eth2_config.spec.clone())
.disable(config.disable_attesting);

Expand Down
205 changes: 195 additions & 10 deletions validator_client/validator_services/src/attestation_data_service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,134 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use beacon_node_fallback::BeaconNodeFallback;
use safe_arith::SafeArith;
use slot_clock::SlotClock;
use tracing::{Instrument, info_span};
use types::{AttestationData, Slot};
use types::{AttestationData, Checkpoint, Epoch, Slot};

#[derive(Debug, Clone)]
pub enum AttestationDataStrategy {
Fallback,
ByIndex(usize),
Consensus((usize, Option<(Checkpoint, usize)>)),
HighestScore,
IgnoreEpoch(Epoch),
}

// New trait for aggregation strategies that need parallel queries
trait ResultAggregator {
/// Process a single response and decide whether to continue or stop
fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool; // Returns true if we should stop and return

/// Get the final result after aggregation
fn get_result(&self) -> Option<(AttestationData, usize)>;
}

// Consensus aggregator
struct ConsensusAggregator {
results: HashMap<Checkpoint, Vec<usize>>,
threshold: usize,
target_checkpoint_and_index: Option<(Checkpoint, usize)>,
consensus_result: Option<(AttestationData, usize)>,
}

impl ConsensusAggregator {
fn new(threshold: usize, target_checkpoint_and_index: Option<(Checkpoint, usize)>) -> Self {
Self {
results: HashMap::new(),
threshold,
target_checkpoint_and_index,
consensus_result: None,
}
}
}

impl ResultAggregator for ConsensusAggregator {
fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool {
if let Some((target_checkpoint, preferred_index)) = self.target_checkpoint_and_index {
// If we have a preferred index set, return attestation data from it
// TODO(attestation-consensus) this is a small optimization to immediately return data
// from the preferred index. We shouldn't need to check the target checkpoint, but maybe
// its just safer to do so?
if preferred_index == index {
self.consensus_result = Some((attestation_data.clone(), index));
return true;
}
// return if fetched data matches the target checkpoint
if attestation_data.target == target_checkpoint {
self.consensus_result = Some((attestation_data.clone(), index));
return true;
}
}
self.results
.entry(attestation_data.target)
.or_insert_with(Vec::new)
.push(index);

if self
.results
.get(&attestation_data.target)
.is_some_and(|servers| servers.len() >= self.threshold)
{
// Consensus has been reached
self.consensus_result = Some((attestation_data.clone(), index));
return true;
}

false
}

fn get_result(&self) -> Option<(AttestationData, usize)> {
self.consensus_result.clone()
}
}

// Score aggregator
struct ScoreAggregator {
results: HashMap<usize, (u64, AttestationData)>,
// TODO im pretty sure the head slot is just the requested slot
// double check the attestation service before deleting this TODO.
head_slot: Slot,
responses_needed: usize,
responses_received: usize,
}

impl ScoreAggregator {
fn new(head_slot: Slot, responses_needed: usize) -> Self {
Self {
results: HashMap::new(),
head_slot,
responses_needed,
responses_received: 0,
}
}

fn calculate_score(&self, attestation_data: &AttestationData) -> u64 {
let checkpoint_value = attestation_data.source.epoch + attestation_data.target.epoch;
let slot_value = 1 + attestation_data.slot.as_u64() - self.head_slot.as_u64();
// TODO unwrap
checkpoint_value.as_u64() + 1.safe_div(slot_value).unwrap()
}
}

impl ResultAggregator for ScoreAggregator {
fn process_result(&mut self, attestation_data: &AttestationData, index: usize) -> bool {
let score = self.calculate_score(attestation_data);
self.results
.insert(index, (score, attestation_data.clone()));
self.responses_received += 1;

// Stop when we've received enough responses
self.responses_received >= self.responses_needed
}

fn get_result(&self) -> Option<(AttestationData, usize)> {
self.results
.iter()
.max_by_key(|(_, (score, _))| score)
.map(|(idx, (_, data))| (data.clone(), *idx))
}
}

/// The AttestationDataService is responsible for downloading and caching attestation data at a given slot.
/// It also helps prevent us from re-downloading identical attestation data.
Expand All @@ -16,13 +141,12 @@ impl<T: SlotClock> AttestationDataService<T> {
Self { beacon_nodes }
}

pub async fn download_data(
async fn data_by_index(
&self,
request_slot: &Slot,
candidate_beacon_node: Option<usize>,
) -> Result<(AttestationData, usize), String> {
let (attestation_data, node_index) = self
.beacon_nodes
self.beacon_nodes
.first_success_from_index(candidate_beacon_node, |beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
Expand All @@ -36,9 +160,64 @@ impl<T: SlotClock> AttestationDataService<T> {
})
.instrument(info_span!("fetch_attestation_data"))
.await
.map_err(|e| e.to_string())
}

async fn data_with_aggregation(
&self,
request_slot: &Slot,
mut aggregator: impl ResultAggregator,
) -> Result<(AttestationData, usize), String> {
self.beacon_nodes
.first_n_responses(
|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(*request_slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
},
|(attestation_data, index)| aggregator.process_result(attestation_data, *index),
)
.instrument(info_span!("fetch_attestation_data"))
.await
.map_err(|e| e.to_string())?;

Ok((attestation_data, node_index))
aggregator
.get_result()
.ok_or_else(|| "No valid attestation data found".to_string())
}

pub async fn download_data(
&self,
request_slot: &Slot,
strategy: &AttestationDataStrategy,
) -> Result<(AttestationData, usize), String> {
match strategy {
AttestationDataStrategy::Fallback => self.data_by_index(request_slot, None).await,
AttestationDataStrategy::ByIndex(index) => {
self.data_by_index(request_slot, Some(*index)).await
}
AttestationDataStrategy::Consensus((threshold, checkpoint_and_index)) => {
let consensus_aggregator =
ConsensusAggregator::new(*threshold, checkpoint_and_index.clone());
self.data_with_aggregation(request_slot, consensus_aggregator)
.await
}
AttestationDataStrategy::IgnoreEpoch(epoch) => Err(format!(
"Disabled attestation production for epoch {:?}",
epoch
)),
AttestationDataStrategy::HighestScore => {
let aggregator =
ScoreAggregator::new(*request_slot, self.beacon_nodes.num_total().await);
self.data_with_aggregation(request_slot, aggregator).await
}
}
}
}

Expand All @@ -54,7 +233,7 @@ mod tests {
MinimalEthSpec, Slot,
};

use crate::attestation_data_service::AttestationDataService;
use crate::attestation_data_service::{AttestationDataService, AttestationDataStrategy};

fn create_attestation_data(
slot: Slot,
Expand Down Expand Up @@ -156,7 +335,9 @@ mod tests {
));

let service = AttestationDataService::<TestingSlotClock>::new(Arc::new(fallback));
let result = service.download_data(&slot, None).await;
let result = service
.download_data(&slot, &AttestationDataStrategy::Fallback)
.await;

// Verify download is successful
assert!(result.is_ok());
Expand Down Expand Up @@ -186,7 +367,9 @@ mod tests {
));

let service = AttestationDataService::<TestingSlotClock>::new(Arc::new(fallback));
let result = service.download_data(&slot, None).await;
let result = service
.download_data(&slot, &AttestationDataStrategy::Fallback)
.await;

// Verify all nodes offline
assert!(result.is_err());
Expand Down Expand Up @@ -224,7 +407,9 @@ mod tests {
));

let service = AttestationDataService::<TestingSlotClock>::new(Arc::new(fallback));
let result = service.download_data(&slot, None).await;
let result = service
.download_data(&slot, &AttestationDataStrategy::Fallback)
.await;

// Verify download is successful and we fell back to the next node
assert!(result.is_ok());
Expand Down
Loading
Loading