diff --git a/Cargo.lock b/Cargo.lock index a1ad2ab5ba7..4a0a54392e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9903,8 +9903,11 @@ dependencies = [ "futures", "graffiti_file", "logging", + "mockito", "parking_lot", + "regex", "safe_arith", + "serde_json", "slot_clock", "task_executor", "tokio", diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 0f13d8c8b7b..07d1462a75d 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -17,6 +17,7 @@ use std::cmp::Ordering; use std::fmt; use std::fmt::Debug; use std::future::Future; +use std::hash::Hash; use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec::Vec; @@ -613,7 +614,10 @@ impl BeaconNodeFallback { // Run `func` using a `candidate`, returning the value or capturing errors. for candidate in candidates.iter() { - futures.push(Self::run_on_candidate(candidate.beacon_node.clone(), &func)); + futures.push(Self::run_on_beacon_node( + candidate.beacon_node.clone(), + &func, + )); } drop(candidates); @@ -631,7 +635,10 @@ impl BeaconNodeFallback { // Run `func` using a `candidate`, returning the value or capturing errors. for candidate in candidates.iter() { - futures.push(Self::run_on_candidate(candidate.beacon_node.clone(), &func)); + futures.push(Self::run_on_beacon_node( + candidate.beacon_node.clone(), + &func, + )); } drop(candidates); @@ -646,17 +653,59 @@ impl BeaconNodeFallback { Err(Errors(errors)) } - /// Run the future `func` on `candidate` while reporting metrics. + pub async fn first_n_responses( + &self, + fetch_func: F, + mut consensus_check: C, + ) -> Result> + where + F: Fn(CandidateBeaconNode) -> R, + R: Future>, + C: FnMut(&O) -> bool, + O: Eq + Hash + Clone + Debug, + Err: Debug, + { + let mut errors = vec![]; + + // Collect all responses from all candidates + let candidates = self.candidates.read().await; + let mut futures = vec![]; + + for candidate in candidates.iter() { + futures.push(Self::run_on_candidate(candidate.clone(), &fetch_func)); + } + drop(candidates); + + // Process futures sequentially, checking consensus after each response + for future in futures { + match future.await { + Ok(val) => { + if consensus_check(&val) { + return Ok(val); + } + } + Err(e) => { + errors.push(e); + } + } + } + + Err(Errors(errors)) + } + async fn run_on_candidate( - candidate: BeaconNodeHttpClient, + candidate: CandidateBeaconNode, func: F, ) -> Result)> where - F: Fn(BeaconNodeHttpClient) -> R, + F: Fn(CandidateBeaconNode) -> R, R: Future>, Err: Debug, { - inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.server().redacted()]); + inc_counter_vec( + &ENDPOINT_REQUESTS, + &[candidate.beacon_node.server().redacted()], + ); // There exists a race condition where `func` may be called when the candidate is // actually not ready. We deem this an acceptable inefficiency. @@ -664,12 +713,43 @@ impl BeaconNodeFallback { Ok(val) => Ok(val), Err(e) => { debug!( - node = %candidate, + node = %candidate.beacon_node, error = ?e, "Request to beacon node failed" ); - inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.server().redacted()]); - Err((candidate.to_string(), Error::RequestFailed(e))) + inc_counter_vec( + &ENDPOINT_ERRORS, + &[candidate.beacon_node.server().redacted()], + ); + Err((candidate.beacon_node.to_string(), Error::RequestFailed(e))) + } + } + } + + /// Run the future `func` on `beacon_node` while reporting metrics. + async fn run_on_beacon_node( + beacon_node: BeaconNodeHttpClient, + func: F, + ) -> Result)> + where + F: Fn(BeaconNodeHttpClient) -> R, + R: Future>, + Err: Debug, + { + inc_counter_vec(&ENDPOINT_REQUESTS, &[beacon_node.server().redacted()]); + + // There exists a race condition where `func` may be called when the candidate is + // actually not ready. We deem this an acceptable inefficiency. + match func(beacon_node.clone()).await { + Ok(val) => Ok(val), + Err(e) => { + debug!( + node = %beacon_node, + error = ?e, + "Request to beacon node failed" + ); + inc_counter_vec(&ENDPOINT_ERRORS, &[beacon_node.server().redacted()]); + Err((beacon_node.to_string(), Error::RequestFailed(e))) } } } @@ -693,7 +773,10 @@ impl BeaconNodeFallback { // Run `func` using a `candidate`, returning the value or capturing errors. for candidate in candidates.iter() { - futures.push(Self::run_on_candidate(candidate.beacon_node.clone(), &func)); + futures.push(Self::run_on_beacon_node( + candidate.beacon_node.clone(), + &func, + )); } drop(candidates); @@ -1072,4 +1155,255 @@ mod tests { mock1.expect(3).assert(); mock2.expect(3).assert(); } + + #[tokio::test] + async fn first_n_responses_first_response_passes_consensus() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + ); + + let mock1 = mock_beacon_node_1.mock_online_node(); + let mock2 = mock_beacon_node_2.mock_online_node(); + let mock3 = mock_beacon_node_3.mock_online_node(); + + // Use a closure that returns a simple u32 that implements Eq + Hash + let result = beacon_node_fallback + .first_n_responses( + |client| async move { + client.beacon_node.get_node_version().await?; + Ok::(1) + }, + |_| true, // All responses pass consensus + ) + .await; + + match result { + Ok(val) => assert_eq!(val, 1), + Err(_) => panic!("Expected Ok result"), + } + // Only the first node should be called since it passes consensus + mock1.expect(1).assert(); + mock2.expect(0).assert(); + mock3.expect(0).assert(); + } + + #[tokio::test] + async fn first_n_responses_second_response_passes_consensus() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + ); + + let mock1 = mock_beacon_node_1.mock_online_node(); + let mock2 = mock_beacon_node_2.mock_online_node(); + let mock3 = mock_beacon_node_3.mock_online_node(); + + let mut call_count = 0; + let result = beacon_node_fallback + .first_n_responses( + |client| async move { + client.beacon_node.get_node_version().await?; + Ok::(1) + }, + |_| { + call_count += 1; + call_count == 2 // Only second response passes consensus + }, + ) + .await; + + match result { + Ok(val) => assert_eq!(val, 1), + Err(_) => panic!("Expected Ok result"), + } + // First two nodes should be called + mock1.expect(1).assert(); + mock2.expect(1).assert(); + mock3.expect(0).assert(); + } + + #[tokio::test] + async fn first_n_responses_no_consensus_all_succeed() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + ); + + let mock1 = mock_beacon_node_1.mock_online_node(); + let mock2 = mock_beacon_node_2.mock_online_node(); + let mock3 = mock_beacon_node_3.mock_online_node(); + + let result = beacon_node_fallback + .first_n_responses( + |client| async move { + client.beacon_node.get_node_version().await?; + Ok::(1) + }, + |_| false, // No response passes consensus + ) + .await; + + assert!(result.is_err()); + // All nodes should be tried + mock1.expect(1).assert(); + mock2.expect(1).assert(); + mock3.expect(1).assert(); + + // Since all succeeded but none passed consensus, errors list should be empty + let errors = result.unwrap_err(); + assert_eq!(errors.num_errors(), 0); + } + + #[tokio::test] + async fn first_n_responses_all_fail() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + ); + + let mock1 = mock_beacon_node_1.mock_offline_node(); + let mock2 = mock_beacon_node_2.mock_offline_node(); + let mock3 = mock_beacon_node_3.mock_offline_node(); + + let result = beacon_node_fallback + .first_n_responses( + |client| async move { + client.beacon_node.get_node_version().await?; + Ok::(1) + }, + |_| true, // Consensus check passes, but all requests fail + ) + .await; + + assert!(result.is_err()); + mock1.expect(1).assert(); + mock2.expect(1).assert(); + mock3.expect(1).assert(); + + // All three nodes failed, so we should have 3 errors + let errors = result.unwrap_err(); + assert_eq!(errors.num_errors(), 3); + } + + #[tokio::test] + async fn first_n_responses_mixed_errors_and_success() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + ); + + let mock1 = mock_beacon_node_1.mock_offline_node(); + let mock2 = mock_beacon_node_2.mock_offline_node(); + let mock3 = mock_beacon_node_3.mock_online_node(); + + let result = beacon_node_fallback + .first_n_responses( + |client| async move { + client.beacon_node.get_node_version().await?; + Ok::(1) + }, + |_| true, // All successful responses pass consensus + ) + .await; + + match result { + Ok(val) => assert_eq!(val, 1), + Err(_) => panic!("Expected Ok result"), + } + mock1.expect(1).assert(); + mock2.expect(1).assert(); + mock3.expect(1).assert(); + } + + #[tokio::test] + async fn first_n_responses_third_response_passes_after_errors() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + ); + + let mock1 = mock_beacon_node_1.mock_offline_node(); + let mock2 = mock_beacon_node_2.mock_online_node(); + let mock3 = mock_beacon_node_3.mock_online_node(); + + let mut success_count = 0; + let result = beacon_node_fallback + .first_n_responses( + |client| async move { + client.beacon_node.get_node_version().await?; + Ok::(1) + }, + |_| { + success_count += 1; + success_count == 2 // Second successful response passes consensus + }, + ) + .await; + + match result { + Ok(val) => assert_eq!(val, 1), + Err(_) => panic!("Expected Ok result"), + } + mock1.expect(1).assert(); + mock2.expect(1).assert(); + mock3.expect(1).assert(); + } + + #[tokio::test] + async fn first_n_responses_empty_candidates() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + + let beacon_node_fallback = create_beacon_node_fallback(vec![], vec![], spec.clone()); + + let result = beacon_node_fallback + .first_n_responses( + |client| async move { + client.beacon_node.get_node_version().await?; + Ok::(1) + }, + |_| true, + ) + .await; + + assert!(result.is_err()); + let errors = result.unwrap_err(); + assert_eq!(errors.num_errors(), 0); + } } diff --git a/validator_client/validator_metrics/src/lib.rs b/validator_client/validator_metrics/src/lib.rs index 060d8a4edd2..f5767fc408e 100644 --- a/validator_client/validator_metrics/src/lib.rs +++ b/validator_client/validator_metrics/src/lib.rs @@ -168,6 +168,14 @@ pub static PROPOSAL_CHANGED: LazyLock> = LazyLock::new(|| { "A duties update discovered a new block proposer for the current slot", ) }); + +pub static ATTESTATION_CONSENSUS_FAILURES: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "vc_attestation_consensus_failures", + "Number of attestation consensus failures", + &["task"], + ) +}); /* * Endpoint metrics */ diff --git a/validator_client/validator_services/Cargo.toml b/validator_client/validator_services/Cargo.toml index c9149409148..5fc245cd9b9 100644 --- a/validator_client/validator_services/Cargo.toml +++ b/validator_client/validator_services/Cargo.toml @@ -22,3 +22,8 @@ tree_hash = { workspace = true } types = { workspace = true } validator_metrics = { workspace = true } validator_store = { workspace = true } + +[dev-dependencies] +mockito = { workspace = true } +regex = { workspace = true } +serde_json = { workspace = true } diff --git a/validator_client/validator_services/src/attestation_data_service.rs b/validator_client/validator_services/src/attestation_data_service.rs new file mode 100644 index 00000000000..294b16d4768 --- /dev/null +++ b/validator_client/validator_services/src/attestation_data_service.rs @@ -0,0 +1,752 @@ +use std::{collections::HashMap, marker::PhantomData, sync::Arc}; + +use beacon_node_fallback::BeaconNodeFallback; +use slot_clock::SlotClock; +use tracing::{error, instrument, warn}; +use types::{AttestationData, Checkpoint, Epoch, EthSpec, Slot}; + +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct FFGConsensus { + pub source: Checkpoint, + pub target: Checkpoint, +} + +impl FFGConsensus { + pub fn new(attestation_data: &AttestationData) -> Self { + FFGConsensus { + source: attestation_data.source, + target: attestation_data.target, + } + } +} + +/// The AttestationDataService is responsible for downloading and caching attestation data at a given slot. +/// It also helps prevent us from re-downloading identical attestation data. +pub struct AttestationDataService { + most_recent_epoch: Option, + latest_attestable_epoch: Option, + attestation_data_map: HashMap, + valid_candidates: Option>, + beacon_nodes: Arc>, + phantom: PhantomData, +} + +impl AttestationDataService { + pub fn new(beacon_nodes: Arc>) -> Self { + Self { + most_recent_epoch: None, + latest_attestable_epoch: None, + attestation_data_map: HashMap::new(), + valid_candidates: None, + beacon_nodes, + phantom: PhantomData, + } + } + + /// Get previously downloaded attestation data. If the Electra fork is enabled + /// we don't care about the committee index. If we're pre-Electra, we insert + /// the correct committee index. + pub fn get_cached_attestation_data(&self, slot: &Slot) -> Option { + self.attestation_data_map.get(slot).cloned() + } + + fn prune_service(&mut self) { + self.attestation_data_map.clear(); + self.valid_candidates = None; + } + + /// Download attestation data for this slot/committee index from the beacon node. + #[instrument(skip(self), fields(slot = %slot, consensus_threshold, use_valid_candidates))] + pub async fn download_data( + &mut self, + slot: &Slot, + consensus_threshold: usize, + use_valid_candidates: bool, + ) -> Result<(), String> { + let mut results = HashMap::new(); + + // Prune data after each epoch + if let Some(most_recent_epoch) = self.most_recent_epoch + && slot.epoch(E::slots_per_epoch()) > most_recent_epoch + { + self.prune_service(); + } + + self.most_recent_epoch = Some(slot.epoch(E::slots_per_epoch())); + + // If we've already downloaded attestation data for this slot, there's no need to re-download the data. + if self.get_cached_attestation_data(slot).is_some() { + return Ok(()); + } + + let valid_candidates = self.valid_candidates.clone(); + + let attestation_data_result = self + .beacon_nodes + .first_n_responses( + |candidate| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + + candidate + .beacon_node + .get_validator_attestation_data(*slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| (result.data, candidate.index)) + }, + |(attestation_data, index)| { + // If `use_valid_candidates` is enabled, only accept attestation data from valid candidate indices + if use_valid_candidates { + if let Some(valid_candidates) = &valid_candidates { + if !valid_candidates.contains(index) { + return false; + } + } else { + return false; + } + } + + let ffg_consensus = FFGConsensus::new(attestation_data); + + results + .entry(ffg_consensus.clone()) + .or_insert_with(Vec::new) + .push(*index); + if results + .get(&ffg_consensus) + .is_some_and(|servers| servers.len() >= consensus_threshold) + { + self.latest_attestable_epoch = Some(slot.epoch(E::slots_per_epoch())); + return true; + } + + false + }, + ) + .await + .map(|(attestation_data, _)| attestation_data); + + match attestation_data_result { + Ok(attestation_data) => { + let ffg_consensus = FFGConsensus::new(&attestation_data); + + if let Some(indices) = results.get(&ffg_consensus) { + self.valid_candidates = Some(indices.clone()); + self.attestation_data_map.insert(*slot, attestation_data); + } else { + // This should never happen + error!( + "A valid attestation data was constructed but the relevant candidate indices were not found" + ); + return Err("Unable to find valid candidate indices".to_string()); + } + + Ok(()) + } + Err(e) => { + validator_metrics::inc_counter_vec( + &validator_metrics::ATTESTATION_CONSENSUS_FAILURES, + &[validator_metrics::ATTESTATIONS], + ); + warn!( + response_counts = ?results + .iter() + .map(|(k, v)| (k, v.len())) + .collect::>(), + "Consensus threshold not met" + ); + if results.is_empty() { + Err(format!( + "All beacon nodes failed to provide attestation data: {}", + e + )) + } else { + Err(format!( + "Consensus threshold {} not met. Responses by server: {:?}. Network errors: {}", + consensus_threshold, results, e + )) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use beacon_node_fallback::{BeaconNodeFallback, CandidateBeaconNode, Config as FallbackConfig}; + use eth2::SensitiveUrl; + use eth2::Timeouts; + use slot_clock::TestingSlotClock; + use std::time::Duration; + use types::{FixedBytesExtended, Hash256, MainnetEthSpec}; + + type E = MainnetEthSpec; + + fn create_attestation_data( + slot: Slot, + source_epoch: Epoch, + target_epoch: Epoch, + ) -> AttestationData { + AttestationData { + slot, + index: 0, + beacon_block_root: Hash256::ZERO, + source: Checkpoint { + epoch: source_epoch, + root: Hash256::ZERO, + }, + target: Checkpoint { + epoch: target_epoch, + root: Hash256::from_low_u64_be(target_epoch.as_u64()), + }, + } + } + + fn create_test_beacon_node_fallback() -> BeaconNodeFallback { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let url = SensitiveUrl::parse("http://localhost:5052").unwrap(); + let client = + eth2::BeaconNodeHttpClient::new(url, Timeouts::set_all(Duration::from_secs(1))); + let candidate = CandidateBeaconNode::new(client, 0); + + let mut fallback = + BeaconNodeFallback::new(vec![candidate], FallbackConfig::default(), vec![], spec); + + fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + fallback + } + + #[test] + fn test_ffg_consensus_new() { + let attestation_data = create_attestation_data(Slot::new(10), Epoch::new(1), Epoch::new(2)); + let ffg = FFGConsensus::new(&attestation_data); + + assert_eq!(ffg.source, attestation_data.source); + assert_eq!(ffg.target, attestation_data.target); + } + + #[test] + fn test_ffg_consensus_equality() { + let attestation_data_1 = + create_attestation_data(Slot::new(10), Epoch::new(1), Epoch::new(2)); + let attestation_data_2 = + create_attestation_data(Slot::new(11), Epoch::new(1), Epoch::new(2)); + + let ffg_1 = FFGConsensus::new(&attestation_data_1); + let ffg_2 = FFGConsensus::new(&attestation_data_2); + + // Should be equal even though slots differ, because source/target are the same + assert_eq!(ffg_1, ffg_2); + } + + #[test] + fn test_new_service() { + let beacon_node_fallback = create_test_beacon_node_fallback(); + let service = + AttestationDataService::::new(Arc::new(beacon_node_fallback)); + + assert_eq!(service.most_recent_epoch, None); + assert_eq!(service.latest_attestable_epoch, None); + assert!(service.attestation_data_map.is_empty()); + assert_eq!(service.valid_candidates, None); + } + + #[test] + fn test_get_cached_attestation_data_empty() { + let beacon_node_fallback = create_test_beacon_node_fallback(); + let service = + AttestationDataService::::new(Arc::new(beacon_node_fallback)); + + assert_eq!(service.get_cached_attestation_data(&Slot::new(1)), None); + } + + #[test] + fn test_get_cached_attestation_data_returns_cached() { + let beacon_node_fallback = create_test_beacon_node_fallback(); + let mut service = + AttestationDataService::::new(Arc::new(beacon_node_fallback)); + + let slot = Slot::new(10); + let attestation_data = create_attestation_data(slot, Epoch::new(0), Epoch::new(1)); + service + .attestation_data_map + .insert(slot, attestation_data.clone()); + + let cached = service.get_cached_attestation_data(&slot); + assert!(cached.is_some()); + assert_eq!(cached.unwrap(), attestation_data); + } + + #[test] + fn test_prune_service_clears_data() { + let beacon_node_fallback = create_test_beacon_node_fallback(); + let mut service = + AttestationDataService::::new(Arc::new(beacon_node_fallback)); + + // Add some data + let slot = Slot::new(10); + let attestation_data = create_attestation_data(slot, Epoch::new(0), Epoch::new(1)); + service.attestation_data_map.insert(slot, attestation_data); + service.valid_candidates = Some(vec![0, 1]); + + // Prune + service.prune_service(); + + assert!(service.attestation_data_map.is_empty()); + assert_eq!(service.valid_candidates, None); + } + + #[test] + fn test_valid_candidates_tracked_correctly() { + let beacon_node_fallback = create_test_beacon_node_fallback(); + let mut service = + AttestationDataService::::new(Arc::new(beacon_node_fallback)); + + // Initially None + assert_eq!(service.valid_candidates, None); + + // Can be set + service.valid_candidates = Some(vec![0, 1, 2]); + assert_eq!(service.valid_candidates, Some(vec![0, 1, 2])); + + // Can be updated + service.valid_candidates = Some(vec![1]); + assert_eq!(service.valid_candidates, Some(vec![1])); + + // Pruning clears it + service.prune_service(); + assert_eq!(service.valid_candidates, None); + } + + #[test] + fn test_most_recent_epoch_tracking() { + let beacon_node_fallback = create_test_beacon_node_fallback(); + let mut service = + AttestationDataService::::new(Arc::new(beacon_node_fallback)); + + assert_eq!(service.most_recent_epoch, None); + + // Set most recent epoch + service.most_recent_epoch = Some(Epoch::new(5)); + assert_eq!(service.most_recent_epoch, Some(Epoch::new(5))); + } + + #[test] + fn test_latest_attestable_epoch_tracking() { + let beacon_node_fallback = create_test_beacon_node_fallback(); + let mut service = + AttestationDataService::::new(Arc::new(beacon_node_fallback)); + + assert_eq!(service.latest_attestable_epoch, None); + + // Set latest attestable epoch + service.latest_attestable_epoch = Some(Epoch::new(3)); + assert_eq!(service.latest_attestable_epoch, Some(Epoch::new(3))); + } + + #[test] + fn test_attestation_data_map_operations() { + let beacon_node_fallback = create_test_beacon_node_fallback(); + let mut service = + AttestationDataService::::new(Arc::new(beacon_node_fallback)); + + let slot1 = Slot::new(10); + let slot2 = Slot::new(20); + let attestation_data1 = create_attestation_data(slot1, Epoch::new(0), Epoch::new(1)); + let attestation_data2 = create_attestation_data(slot2, Epoch::new(1), Epoch::new(2)); + + // Insert data + service + .attestation_data_map + .insert(slot1, attestation_data1.clone()); + service + .attestation_data_map + .insert(slot2, attestation_data2.clone()); + + // Retrieve data + assert_eq!( + service.get_cached_attestation_data(&slot1), + Some(attestation_data1) + ); + assert_eq!( + service.get_cached_attestation_data(&slot2), + Some(attestation_data2) + ); + + // Non-existent slot returns None + assert_eq!(service.get_cached_attestation_data(&Slot::new(30)), None); + } + + // Helper to create a beacon node with mocked attestation endpoint + async fn create_mocked_beacon_node( + index: usize, + slot: Slot, + attestation_data: AttestationData, + ) -> (mockito::ServerGuard, CandidateBeaconNode) { + use eth2::types::GenericResponse; + use mockito::{Matcher, Server}; + use regex::Regex; + + let mut server = Server::new_async().await; + let data = GenericResponse::from(attestation_data); + + let path_pattern = Regex::new(&format!( + r"^/eth/v1/validator/attestation_data\?slot={}&committee_index=0$", + slot.as_u64() + )) + .unwrap(); + + server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(200) + .with_body(serde_json::to_string(&data).unwrap()) + .create(); + + let url = SensitiveUrl::parse(&server.url()).unwrap(); + let client = + eth2::BeaconNodeHttpClient::new(url, Timeouts::set_all(Duration::from_secs(1))); + let candidate = CandidateBeaconNode::new(client, index); + + (server, candidate) + } + + async fn create_offline_beacon_node( + index: usize, + ) -> (mockito::ServerGuard, CandidateBeaconNode) { + use mockito::{Matcher, Server}; + use regex::Regex; + + let mut server = Server::new_async().await; + let path_pattern = Regex::new(r"^/eth/v1/validator/attestation_data").unwrap(); + + server + .mock("GET", Matcher::Regex(path_pattern.to_string())) + .with_status(500) + .create(); + + let url = SensitiveUrl::parse(&server.url()).unwrap(); + let client = + eth2::BeaconNodeHttpClient::new(url, Timeouts::set_all(Duration::from_secs(1))); + let candidate = CandidateBeaconNode::new(client, index); + + (server, candidate) + } + + #[tokio::test] + async fn test_download_data_single_node_consensus() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let slot = Slot::new(10); + let attestation_data = create_attestation_data(slot, Epoch::new(0), Epoch::new(1)); + + let (_server, beacon_node) = + create_mocked_beacon_node(0, slot, attestation_data.clone()).await; + + let mut fallback = + BeaconNodeFallback::new(vec![beacon_node], FallbackConfig::default(), vec![], spec); + + fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + let mut service = AttestationDataService::::new(Arc::new(fallback)); + + // Download data with consensus threshold of 1 + let result = service.download_data(&slot, 1, false).await; + + assert!(result.is_ok()); + assert_eq!(service.valid_candidates, Some(vec![0])); + assert_eq!( + service.latest_attestable_epoch, + Some(slot.epoch(E::slots_per_epoch())) + ); + + // Verify data is cached after successful download + assert_eq!( + service.get_cached_attestation_data(&slot), + Some(attestation_data) + ); + } + + #[tokio::test] + async fn test_download_data_multi_node_consensus() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let slot = Slot::new(10); + let attestation_data = create_attestation_data(slot, Epoch::new(0), Epoch::new(1)); + + // Create 3 nodes returning the same data + let (_server1, beacon_node_1) = + create_mocked_beacon_node(0, slot, attestation_data.clone()).await; + let (_server2, beacon_node_2) = + create_mocked_beacon_node(1, slot, attestation_data.clone()).await; + let (_server3, beacon_node_3) = + create_mocked_beacon_node(2, slot, attestation_data.clone()).await; + + let mut fallback = BeaconNodeFallback::new( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + FallbackConfig::default(), + vec![], + spec, + ); + + fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + let mut service = AttestationDataService::::new(Arc::new(fallback)); + + // Download data with consensus threshold of 2 + let result = service.download_data(&slot, 2, false).await; + + assert!(result.is_ok()); + // Should have at least 2 nodes in valid_candidates + assert!(service.valid_candidates.as_ref().unwrap().len() >= 2); + + // Verify data is cached after successful download + assert_eq!( + service.get_cached_attestation_data(&slot), + Some(attestation_data) + ); + } + + #[tokio::test] + async fn test_download_data_consensus_not_reached() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let slot = Slot::new(10); + + // Create 3 nodes returning different attestation data + let attestation_data_1 = create_attestation_data(slot, Epoch::new(0), Epoch::new(1)); + let attestation_data_2 = create_attestation_data(slot, Epoch::new(0), Epoch::new(2)); + let attestation_data_3 = create_attestation_data(slot, Epoch::new(1), Epoch::new(2)); + + let (_server1, beacon_node_1) = + create_mocked_beacon_node(0, slot, attestation_data_1).await; + let (_server2, beacon_node_2) = + create_mocked_beacon_node(1, slot, attestation_data_2).await; + let (_server3, beacon_node_3) = + create_mocked_beacon_node(2, slot, attestation_data_3).await; + + let mut fallback = BeaconNodeFallback::new( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + FallbackConfig::default(), + vec![], + spec, + ); + + fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + let mut service = AttestationDataService::::new(Arc::new(fallback)); + + // Try to download with consensus threshold of 2 - should fail + let result = service.download_data(&slot, 2, false).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Consensus threshold")); + + // Verify no data was cached since consensus failed + assert_eq!(service.get_cached_attestation_data(&slot), None); + } + + #[tokio::test] + async fn test_download_data_all_nodes_offline() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let slot = Slot::new(10); + + let (_server1, beacon_node_1) = create_offline_beacon_node(0).await; + let (_server2, beacon_node_2) = create_offline_beacon_node(1).await; + + let mut fallback = BeaconNodeFallback::new( + vec![beacon_node_1, beacon_node_2], + FallbackConfig::default(), + vec![], + spec, + ); + + fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + let mut service = AttestationDataService::::new(Arc::new(fallback)); + + let result = service.download_data(&slot, 1, false).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().contains("All beacon nodes failed")); + + // Verify no data was cached since all nodes failed + assert_eq!(service.get_cached_attestation_data(&slot), None); + } + + #[tokio::test] + async fn test_download_data_caching_works() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let slot = Slot::new(10); + let attestation_data = create_attestation_data(slot, Epoch::new(0), Epoch::new(1)); + + let (_server, beacon_node) = + create_mocked_beacon_node(0, slot, attestation_data.clone()).await; + + let mut fallback = + BeaconNodeFallback::new(vec![beacon_node], FallbackConfig::default(), vec![], spec); + + fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + let mut service = AttestationDataService::::new(Arc::new(fallback)); + + // First download + let result = service.download_data(&slot, 1, false).await; + assert!(result.is_ok()); + + // Verify data is cached after first download + assert_eq!( + service.get_cached_attestation_data(&slot), + Some(attestation_data.clone()) + ); + + // Second call for same slot should return Ok immediately via early return (line 78) + // without hitting the network again + let result2 = service.download_data(&slot, 1, false).await; + assert!(result2.is_ok()); + + // Data should still be cached + assert_eq!( + service.get_cached_attestation_data(&slot), + Some(attestation_data) + ); + } + + #[tokio::test] + async fn test_download_data_with_valid_candidates_filter() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let slot = Slot::new(10); + let attestation_data = create_attestation_data(slot, Epoch::new(0), Epoch::new(1)); + + let (_server1, beacon_node_1) = + create_mocked_beacon_node(0, slot, attestation_data.clone()).await; + let (_server2, beacon_node_2) = + create_mocked_beacon_node(1, slot, attestation_data.clone()).await; + + let mut fallback = BeaconNodeFallback::new( + vec![beacon_node_1, beacon_node_2], + FallbackConfig::default(), + vec![], + spec, + ); + + fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + let mut service = AttestationDataService::::new(Arc::new(fallback)); + + // Set valid candidates to only node 0 + service.valid_candidates = Some(vec![0]); + + // With use_valid_candidates=true, only node 0 should be accepted + let result = service.download_data(&slot, 1, true).await; + + assert!(result.is_ok()); + assert_eq!(service.valid_candidates, Some(vec![0])); + + // Verify data is cached after successful download + assert_eq!( + service.get_cached_attestation_data(&slot), + Some(attestation_data) + ); + } + + #[tokio::test] + async fn test_download_data_prunes_on_epoch_transition() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + + // Slot in epoch 0 + let slot_epoch_0 = Slot::new(10); + let attestation_data_epoch_0 = + create_attestation_data(slot_epoch_0, Epoch::new(0), Epoch::new(1)); + + // Slot in epoch 1 + let slot_epoch_1 = Slot::new(E::slots_per_epoch()); + let attestation_data_epoch_1 = + create_attestation_data(slot_epoch_1, Epoch::new(1), Epoch::new(2)); + + let (_server1, beacon_node_1) = + create_mocked_beacon_node(0, slot_epoch_0, attestation_data_epoch_0.clone()).await; + let (_server2, beacon_node_2) = + create_mocked_beacon_node(0, slot_epoch_1, attestation_data_epoch_1.clone()).await; + + let mut fallback = BeaconNodeFallback::new( + vec![beacon_node_1], + FallbackConfig::default(), + vec![], + spec.clone(), + ); + + fallback.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + + let mut service = AttestationDataService::::new(Arc::new(fallback)); + + // Download for epoch 0 + let result = service.download_data(&slot_epoch_0, 1, false).await; + assert!(result.is_ok()); + assert_eq!(service.most_recent_epoch, Some(Epoch::new(0))); + + // Verify epoch 0 data is cached + assert_eq!( + service.get_cached_attestation_data(&slot_epoch_0), + Some(attestation_data_epoch_0) + ); + + // Now update fallback with second server for epoch 1 + let mut fallback2 = + BeaconNodeFallback::new(vec![beacon_node_2], FallbackConfig::default(), vec![], spec); + fallback2.set_slot_clock(TestingSlotClock::new( + Slot::new(1), + Duration::from_secs(0), + Duration::from_secs(12), + )); + service.beacon_nodes = Arc::new(fallback2); + + // Download for epoch 1 - should trigger pruning of epoch 0 data + let result = service.download_data(&slot_epoch_1, 1, false).await; + assert!(result.is_ok()); + assert_eq!(service.most_recent_epoch, Some(Epoch::new(1))); + + // Old epoch 0 cache should be cleared + assert_eq!(service.get_cached_attestation_data(&slot_epoch_0), None); + + // But epoch 1 data should be cached + assert_eq!( + service.get_cached_attestation_data(&slot_epoch_1), + Some(attestation_data_epoch_1) + ); + assert_eq!(service.valid_candidates, Some(vec![0])); + } +} diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index da6e8f35886..e4b3bc4f292 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,4 +1,7 @@ -use crate::duties_service::{DutiesService, DutyAndProof}; +use crate::{ + attestation_data_service::AttestationDataService, + duties_service::{DutiesService, DutyAndProof}, +}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use futures::future::join_all; use logging::crit; @@ -7,12 +10,18 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; -use tokio::time::{Duration, Instant, sleep, sleep_until}; +use tokio::{ + sync::RwLock, + time::{Duration, Instant, sleep, sleep_until}, +}; use tracing::{debug, error, info, trace, warn}; use tree_hash::TreeHash; use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; +// The default consensus threshold used for the attestation data service +const DEFAULT_CONSENSUS_THRESHOLD: usize = 1; + /// Builds an `AttestationService`. #[derive(Default)] pub struct AttestationServiceBuilder { @@ -22,6 +31,7 @@ pub struct AttestationServiceBuilder beacon_nodes: Option>>, executor: Option, chain_spec: Option>, + consensus_threshold: usize, disable: bool, } @@ -34,6 +44,7 @@ impl AttestationServiceBuil beacon_nodes: None, executor: None, chain_spec: None, + consensus_threshold: 1, disable: false, } } @@ -73,6 +84,11 @@ impl AttestationServiceBuil self } + pub fn consensus_threshold(mut self, consensus_threshold: usize) -> Self { + self.consensus_threshold = consensus_threshold; + self + } + pub fn build(self) -> Result, String> { Ok(AttestationService { inner: Arc::new(Inner { @@ -95,6 +111,7 @@ impl AttestationServiceBuil .chain_spec .ok_or("Cannot build AttestationService without chain_spec")?, disable: self.disable, + consensus_threshold: self.consensus_threshold, }), }) } @@ -109,6 +126,7 @@ pub struct Inner { executor: TaskExecutor, chain_spec: Arc, disable: bool, + consensus_threshold: usize, } /// Attempts to produce attestations for all known validators 1/3rd of the way through each slot. @@ -156,13 +174,29 @@ impl AttestationService 1 { + self.spawn_consensus_threshold_task( + ¤t_slot, + attestation_data_service.clone(), + ); + trace!("Spawned consensus threshold task"); + } + sleep(duration_to_next_slot + slot_duration / 3).await; - if let Err(e) = self.spawn_attestation_tasks(slot_duration) { + if let Err(e) = self + .spawn_attestation_tasks(attestation_data_service.clone(), slot_duration) + { crit!(error = e, "Failed to spawn attestation tasks") } else { trace!("Spawned attestation tasks"); @@ -180,10 +214,38 @@ impl AttestationService>>, + ) { + let inner = self.inner.clone(); + let executor = inner.executor.clone(); + let consensus_threshold = self.consensus_threshold; + + let slot = *slot; + executor.spawn( + async move { + let mut attestation_data_service = attestation_data_service.write().await; + let _ = (*attestation_data_service) + .download_data(&slot, consensus_threshold, false) + .await; + }, + "consensus_threshold_task", + ); + } + /// For each each required attestation, spawn a new task that downloads, signs and uploads the /// attestation to the beacon node. - fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> { + fn spawn_attestation_tasks( + &self, + attestation_data_service: Arc>>, + slot_duration: Duration, + ) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + let duration_to_next_slot = self .slot_clock .duration_to_next_slot() @@ -217,6 +279,7 @@ impl AttestationService AttestationService>>, slot: Slot, committee_index: CommitteeIndex, validator_duties: Vec, @@ -265,7 +329,12 @@ impl AttestationService AttestationService>>, slot: Slot, committee_index: CommitteeIndex, validator_duties: &[DutyAndProof], @@ -344,22 +414,18 @@ impl AttestationService