From 63a7fdd62b1e4132cffb2f96d787b5178229147e Mon Sep 17 00:00:00 2001 From: Laura Makdah Date: Wed, 2 Nov 2022 11:59:15 -0700 Subject: [PATCH] add test --- narwhal/node/tests/reconfigure.rs | 2 +- narwhal/primary/src/core.rs | 41 ++-- .../src/tests/certificate_waiter_tests.rs | 20 +- narwhal/primary/src/tests/core_tests.rs | 176 +++++++++++++++++- narwhal/storage/src/proposer_store.rs | 2 +- 5 files changed, 208 insertions(+), 33 deletions(-) diff --git a/narwhal/node/tests/reconfigure.rs b/narwhal/node/tests/reconfigure.rs index 14b753a22e1c0..0227bfd3eb89b 100644 --- a/narwhal/node/tests/reconfigure.rs +++ b/narwhal/node/tests/reconfigure.rs @@ -159,7 +159,7 @@ async fn run_client( } } } - +#[ignore] #[tokio::test] async fn restart() { telemetry_subscribers::init_for_testing(); diff --git a/narwhal/primary/src/core.rs b/narwhal/primary/src/core.rs index 965ef1c186f8b..68b054e4a79b1 100644 --- a/narwhal/primary/src/core.rs +++ b/narwhal/primary/src/core.rs @@ -16,7 +16,6 @@ use fastcrypto::{hash::Hash as _, SignatureService}; use futures::stream::FuturesUnordered; use futures::StreamExt; use network::{CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork}; -use std::time::Duration; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -26,6 +25,7 @@ use storage::CertificateStore; use store::Store; use sui_metrics::spawn_monitored_task; use tokio::{sync::watch, task::JoinHandle, time}; + use tracing::{debug, error, info, instrument, trace, warn}; use types::{ ensure, @@ -39,8 +39,6 @@ use types::{ #[path = "tests/core_tests.rs"] pub mod core_tests; -const RECOVERY_REQUEST_TIMEOUT_SECS: u64 = 60; - pub struct Core { /// The public key of this primary. name: PublicKey, @@ -192,7 +190,7 @@ impl Core { .map(|(_, _, network_key)| network_key) .collect(); - let network = P2pNetwork::new(self.network.network().clone()); + let network = P2pNetwork::new(self.network.network()); let mut header_futures = FuturesUnordered::new(); let request = LatestHeaderRequest {}; for peer in peers.iter() { @@ -204,32 +202,27 @@ impl Core { network.get_latest_header(peer, request).await }); } - let request_interval = Duration::from_secs(RECOVERY_REQUEST_TIMEOUT_SECS); - let mut interval = Box::pin(time::sleep(request_interval)); - loop { - tokio::select! { - res = header_futures.next() => { - match res { - Some(Ok(response)) => { + while let Some(res) = header_futures.next().await { + match res { + Ok(response) => { if let Some(header) = response.header { - self.process_header(&header); + let result = self.process_header(&header).await; + if result.is_err() { + error!( + "error on recovery path processing latest header: {:?}", + result.err() + ); + } + } else { + trace!("peer's latest header was None on recovery path") } } - Some(Err(e)) => { - error!( + Err(e) => { + debug!( "failed to get latest header from peer as recovery on startup: {:?}", e ) } - None => { - break; - } - } - } - _ = &mut interval => { - debug!("timeout was passed when requesting recovery header from peer"); - break; - } } } @@ -742,7 +735,7 @@ impl Core { // Cleanup internal state. let keys = self.vote_digest_store.iter(None).await.into_keys(); if let Err(e) = self.vote_digest_store.remove_all(keys).await { - error!("Error in change epoch when clearing vote store {}", e); + error!("Error in change epoch when clearing vote store {:?}", e); } self.processing.clear(); self.certificates_aggregators.clear(); diff --git a/narwhal/primary/src/tests/certificate_waiter_tests.rs b/narwhal/primary/src/tests/certificate_waiter_tests.rs index 27c8029165bee..b9bb66e1a8571 100644 --- a/narwhal/primary/src/tests/certificate_waiter_tests.rs +++ b/narwhal/primary/src/tests/certificate_waiter_tests.rs @@ -18,7 +18,8 @@ use std::{ sync::Arc, time::Duration, }; -use storage::CertificateStore; + +use storage::{CertificateStore, ProposerStore}; use test_utils::{temp_dir, CommitteeFixture}; use tokio::{ sync::{ @@ -35,13 +36,14 @@ use types::{ ReconfigureNotification, Round, }; -struct FetchCertificateProxy { +pub struct NetworkProxy { request: Sender, response: Arc>>, + proposer_store: ProposerStore, } #[async_trait] -impl PrimaryToPrimary for FetchCertificateProxy { +impl PrimaryToPrimary for NetworkProxy { async fn send_message( &self, request: anemo::Request, @@ -75,7 +77,14 @@ impl PrimaryToPrimary for FetchCertificateProxy { &self, _request: anemo::Request, ) -> Result, anemo::rpc::Status> { - Ok(anemo::Response::new(LatestHeaderResponse { header: None })) + let latest_header = self.proposer_store.get_last_proposed().map_err(|e| { + anemo::rpc::Status::internal(format!( + "error fetching latest proposed header from store: {e}" + )) + })?; + Ok(anemo::Response::new(LatestHeaderResponse { + header: latest_header, + })) } } @@ -200,9 +209,10 @@ async fn fetch_certificates_basic() { let fake_primary_addr = network::multiaddr_to_address(fake_primary.address()).unwrap(); let fake_route = - anemo::Router::new().add_rpc_service(PrimaryToPrimaryServer::new(FetchCertificateProxy { + anemo::Router::new().add_rpc_service(PrimaryToPrimaryServer::new(NetworkProxy { request: tx_fetch_req, response: Arc::new(Mutex::new(rx_fetch_resp)), + proposer_store: ProposerStore::new_for_tests(), })); let fake_server_network = anemo::Network::bind(fake_primary_addr.clone()) .server_name("narwhal") diff --git a/narwhal/primary/src/tests/core_tests.rs b/narwhal/primary/src/tests/core_tests.rs index 9fe1516b90f76..bbcf9616015f1 100644 --- a/narwhal/primary/src/tests/core_tests.rs +++ b/narwhal/primary/src/tests/core_tests.rs @@ -3,12 +3,67 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; use crate::common::{create_db_stores, create_test_vote_store}; -use anemo::{types::PeerInfo, PeerId}; +use anemo::{async_trait, types::PeerInfo, PeerId}; use fastcrypto::traits::KeyPair; use prometheus::Registry; +use storage::ProposerStore; use test_utils::{fixture_batch_with_transactions, CommitteeFixture, PrimaryToPrimaryMockServer}; use tokio::time::Duration; -use types::{CertificateDigest, Header, Vote}; +use types::{ + CertificateDigest, FetchCertificatesRequest, FetchCertificatesResponse, GetCertificatesRequest, + GetCertificatesResponse, Header, LatestHeaderResponse, PayloadAvailabilityRequest, + PayloadAvailabilityResponse, PrimaryToPrimary, Vote, +}; + +pub struct NetworkProxy { + proposer_store: ProposerStore, +} + +#[async_trait] +impl PrimaryToPrimary for NetworkProxy { + async fn send_message( + &self, + request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + unimplemented!( + "FetchCertificateProxy::send_message() is unimplemented!! {:#?}", + request + ); + } + async fn get_certificates( + &self, + _request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + unimplemented!() + } + async fn fetch_certificates( + &self, + _request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + unimplemented!() + } + + async fn get_payload_availability( + &self, + _request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + unimplemented!() + } + + async fn get_latest_header( + &self, + _request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + let latest_header = self.proposer_store.get_last_proposed().map_err(|e| { + anemo::rpc::Status::internal(format!( + "error fetching latest proposed header from store: {e}" + )) + })?; + Ok(anemo::Response::new(LatestHeaderResponse { + header: latest_header, + })) + } +} #[tokio::test] async fn process_header() { @@ -1028,6 +1083,123 @@ async fn recover_core_expecting_header_of_previous_round() { } } +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn recover_core_latest_headers() { + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let primary = fixture.authorities().last().unwrap(); + let name = primary.public_key(); + let other_primary = fixture.authorities().nth(1).unwrap(); + let signature_service = SignatureService::new(primary.keypair().copy()); + + let (_tx_reconfigure, rx_reconfigure) = + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); + let (tx_sync_headers, _rx_sync_headers) = test_utils::test_channel!(1); + let (tx_certificate_waiter, _rx_certificate_waiter) = test_utils::test_channel!(1); + let (_tx_primary_messages, rx_primary_messages) = test_utils::test_channel!(3); + let (_tx_headers_loopback, rx_headers_loopback) = test_utils::test_channel!(1); + let (_tx_certificates_loopback, rx_certificates_loopback) = test_utils::test_channel!(1); + let (_tx_headers, rx_headers) = test_utils::test_channel!(1); + let (tx_consensus, _rx_consensus) = test_utils::test_channel!(3); + let (tx_parents, _rx_parents) = test_utils::test_channel!(3); + let (_tx_consensus_round_updates, rx_consensus_round_updates) = watch::channel(0u64); + + // Create test stores. + let (header_store, certificates_store, payload_store) = create_db_stores(); + let proposer_store = ProposerStore::new_for_tests(); + + // add a header to store + let builder = types::HeaderBuilder::default(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let primary = fixture.authorities().next().unwrap(); + let header = builder + .author(name.clone()) + .round(1) + .epoch(0) + .parents([CertificateDigest::default()].iter().cloned().collect()) + .with_payload_batch(fixture_batch_with_transactions(10), 0) + .build(primary.keypair()) + .unwrap(); + + let result = proposer_store.write_last_proposed(&header); + assert!(result.is_ok()); + + // Make a synchronizer for the core. + let synchronizer = Synchronizer::new( + name.clone(), + &committee, + certificates_store.clone(), + payload_store.clone(), + /* tx_header_waiter */ tx_sync_headers.clone(), + tx_certificate_waiter.clone(), + None, + ); + + let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); + + let fake_primary_addr = network::multiaddr_to_address(other_primary.address()).unwrap(); + let fake_route = + anemo::Router::new().add_rpc_service(types::PrimaryToPrimaryServer::new(NetworkProxy { + proposer_store, + })); + let fake_server_network = anemo::Network::bind(fake_primary_addr.clone()) + .server_name("narwhal") + .private_key( + other_primary + .network_keypair() + .copy() + .private() + .0 + .to_bytes(), + ) + .start(fake_route) + .unwrap(); + let client_network = test_utils::test_network(primary.network_keypair(), primary.address()); + client_network + .connect_with_peer_id(fake_primary_addr, fake_server_network.peer_id()) + .await + .unwrap(); + + // Spawn the core. + let _core_handle = Core::spawn( + name.clone(), + committee.clone(), + worker_cache.clone(), + header_store.clone(), + certificates_store.clone(), + create_test_vote_store(), + synchronizer, + signature_service.clone(), + rx_consensus_round_updates.clone(), + /* gc_depth */ 50, + rx_reconfigure.clone(), + /* rx_primaries */ rx_primary_messages, + /* rx_header_waiter */ rx_headers_loopback, + /* rx_certificate_waiter */ rx_certificates_loopback, + /* rx_proposer */ rx_headers, + tx_consensus, + /* tx_proposer */ tx_parents, + metrics.clone(), + P2pNetwork::new(client_network.clone()), + ); + + tokio::time::sleep(Duration::from_secs(5)).await; + + // assert that the header was processed + let mut m = HashMap::new(); + m.insert("epoch", "0"); + m.insert("source", "own"); + assert_eq!( + metrics + .unique_headers_received + .get_metric_with(&m) + .unwrap() + .get(), + 1 + ); +} + #[tokio::test] async fn shutdown_core() { let fixture = CommitteeFixture::builder().build(); diff --git a/narwhal/storage/src/proposer_store.rs b/narwhal/storage/src/proposer_store.rs index 6fdbf4a5a75ab..e44182c9dd507 100644 --- a/narwhal/storage/src/proposer_store.rs +++ b/narwhal/storage/src/proposer_store.rs @@ -47,7 +47,7 @@ mod test { use test_utils::{fixture_batch_with_transactions, CommitteeFixture}; use types::{CertificateDigest, Header, Round}; - fn create_header_for_round(round: Round) -> Header { + pub fn create_header_for_round(round: Round) -> Header { let builder = types::HeaderBuilder::default(); let fixture = CommitteeFixture::builder().randomize_ports(true).build(); let primary = fixture.authorities().next().unwrap();