From 4aba351980d6c4eec833d120fa07f4ed2c1ca71e Mon Sep 17 00:00:00 2001 From: Laura Makdah Date: Thu, 3 Nov 2022 14:41:04 -0700 Subject: [PATCH] review updates --- narwhal/network/src/p2p.rs | 9 +- narwhal/network/src/traits.rs | 6 +- narwhal/node/tests/reconfigure.rs | 2 +- narwhal/primary/src/core.rs | 36 +++-- narwhal/primary/src/primary.rs | 14 +- .../src/tests/certificate_waiter_tests.rs | 10 +- narwhal/primary/src/tests/core_tests.rs | 146 ++++++------------ narwhal/test-utils/src/lib.rs | 12 +- 8 files changed, 90 insertions(+), 145 deletions(-) diff --git a/narwhal/network/src/p2p.rs b/narwhal/network/src/p2p.rs index b7ff87d545bdb..4c3ff9e59cdfa 100644 --- a/narwhal/network/src/p2p.rs +++ b/narwhal/network/src/p2p.rs @@ -16,10 +16,9 @@ use std::collections::HashMap; use std::time::Duration; use tokio::{runtime::Handle, task::JoinHandle}; use types::{ - Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, - GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse, - PrimaryMessage, PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest, - WorkerBatchMessage, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage, + Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, LatestHeaderRequest, + LatestHeaderResponse, PrimaryMessage, PrimaryToPrimaryClient, PrimaryToWorkerClient, + RequestBatchRequest, WorkerBatchMessage, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToPrimaryClient, WorkerToWorkerClient, }; @@ -226,7 +225,7 @@ impl PrimaryToPrimaryRpc for anemo::Network { async fn get_latest_header( &self, peer: &NetworkPublicKey, - request: LatestHeaderRequest, + request: impl anemo::types::request::IntoRequest + Send, ) -> Result { let peer_id = PeerId(peer.0.to_bytes()); let peer = self diff --git a/narwhal/network/src/traits.rs b/narwhal/network/src/traits.rs index cb7e77c007960..d3409d85a2719 100644 --- a/narwhal/network/src/traits.rs +++ b/narwhal/network/src/traits.rs @@ -7,8 +7,8 @@ use crypto::NetworkPublicKey; use rand::prelude::{SliceRandom, SmallRng}; use tokio::task::JoinHandle; use types::{ - Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, - GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse, + Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, LatestHeaderRequest, + LatestHeaderResponse, }; pub trait UnreliableNetwork { @@ -106,7 +106,7 @@ pub trait PrimaryToPrimaryRpc { async fn get_latest_header( &self, peer: &NetworkPublicKey, - request: LatestHeaderRequest, + request: impl anemo::types::request::IntoRequest + Send, ) -> Result; } diff --git a/narwhal/node/tests/reconfigure.rs b/narwhal/node/tests/reconfigure.rs index 0227bfd3eb89b..14b753a22e1c0 100644 --- a/narwhal/node/tests/reconfigure.rs +++ b/narwhal/node/tests/reconfigure.rs @@ -159,7 +159,7 @@ async fn run_client( } } } -#[ignore] + #[tokio::test] async fn restart() { telemetry_subscribers::init_for_testing(); diff --git a/narwhal/primary/src/core.rs b/narwhal/primary/src/core.rs index 68b054e4a79b1..45e352d1a5ac1 100644 --- a/narwhal/primary/src/core.rs +++ b/narwhal/primary/src/core.rs @@ -16,6 +16,7 @@ use fastcrypto::{hash::Hash as _, SignatureService}; use futures::stream::FuturesUnordered; use futures::StreamExt; use network::{CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork}; +use std::time::Duration; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -24,7 +25,7 @@ use std::{ use storage::CertificateStore; use store::Store; use sui_metrics::spawn_monitored_task; -use tokio::{sync::watch, task::JoinHandle, time}; +use tokio::{sync::watch, task::JoinHandle}; use tracing::{debug, error, info, instrument, trace, warn}; use types::{ @@ -39,6 +40,8 @@ use types::{ #[path = "tests/core_tests.rs"] pub mod core_tests; +const LATEST_HEADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); + pub struct Core { /// The public key of this primary. name: PublicKey, @@ -190,31 +193,24 @@ impl Core { .map(|(_, _, network_key)| network_key) .collect(); - let network = P2pNetwork::new(self.network.network()); let mut header_futures = FuturesUnordered::new(); - let request = LatestHeaderRequest {}; + for peer in peers.iter() { - let network = network.network(); - let request = request.clone(); + let network = self.network.network(); + let request = anemo::Request::new(LatestHeaderRequest {}) + .with_timeout(LATEST_HEADER_REQUEST_TIMEOUT); - header_futures.push(async move { - let _ = &network; - network.get_latest_header(peer, request).await - }); + header_futures.push(async move { network.get_latest_header(peer, request).await }); } + + let mut latest_headers = Vec::new(); while let Some(res) = header_futures.next().await { match res { Ok(response) => { if let Some(header) = response.header { - let result = self.process_header(&header).await; - if result.is_err() { - error!( - "error on recovery path processing latest header: {:?}", - result.err() - ); - } + latest_headers.push(header); } else { - trace!("peer's latest header was None on recovery path") + debug!("peer's latest header was None on recovery path") } } Err(e) => { @@ -226,6 +222,12 @@ impl Core { } } + for header in latest_headers { + if let Err(err) = self.process_header(&header).await { + error!("error on recovery path processing latest header: {:?}", err); + } + } + self } diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 38885bfadb055..703fe4210285d 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -52,15 +52,11 @@ pub use types::PrimaryMessage; use types::{ error::DagError, metered_channel::{channel_with_total, Receiver, Sender}, - BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest, - FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, - HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest, - PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimary, PrimaryToPrimary, - PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToPrimaryServer, - PrimaryToPrimaryServer, ReconfigureNotification, Round, RoundVoteDigestPair, - WorkerInfoResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary, - WorkerToPrimary, WorkerToPrimary, WorkerToPrimary, WorkerToPrimaryServer, - WorkerToPrimaryServer, WorkerToPrimaryServer, WorkerToPrimaryServer, + BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest, + FetchCertificatesResponse, Header, HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, + PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryToPrimary, + PrimaryToPrimaryServer, ReconfigureNotification, RoundVoteDigestPair, WorkerInfoResponse, + WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary, WorkerToPrimaryServer, }; #[cfg(any(test))] diff --git a/narwhal/primary/src/tests/certificate_waiter_tests.rs b/narwhal/primary/src/tests/certificate_waiter_tests.rs index b9bb66e1a8571..f8cfc48eb2299 100644 --- a/narwhal/primary/src/tests/certificate_waiter_tests.rs +++ b/narwhal/primary/src/tests/certificate_waiter_tests.rs @@ -18,7 +18,6 @@ use std::{ sync::Arc, time::Duration, }; - use storage::{CertificateStore, ProposerStore}; use test_utils::{temp_dir, CommitteeFixture}; use tokio::{ @@ -29,11 +28,10 @@ use tokio::{ time::sleep, }; use types::{ - BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest, - FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, - HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, Metadata, PayloadAvailabilityRequest, - PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer, - ReconfigureNotification, Round, + Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest, + FetchCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse, + PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, + PrimaryToPrimaryServer, ReconfigureNotification, Round, }; pub struct NetworkProxy { diff --git a/narwhal/primary/src/tests/core_tests.rs b/narwhal/primary/src/tests/core_tests.rs index bbcf9616015f1..ee81e425256c9 100644 --- a/narwhal/primary/src/tests/core_tests.rs +++ b/narwhal/primary/src/tests/core_tests.rs @@ -3,67 +3,12 @@ // SPDX-License-Identifier: Apache-2.0 use super::*; use crate::common::{create_db_stores, create_test_vote_store}; -use anemo::{async_trait, types::PeerInfo, PeerId}; +use anemo::{types::PeerInfo, PeerId}; use fastcrypto::traits::KeyPair; use prometheus::Registry; -use storage::ProposerStore; use test_utils::{fixture_batch_with_transactions, CommitteeFixture, PrimaryToPrimaryMockServer}; use tokio::time::Duration; -use types::{ - CertificateDigest, FetchCertificatesRequest, FetchCertificatesResponse, GetCertificatesRequest, - GetCertificatesResponse, Header, LatestHeaderResponse, PayloadAvailabilityRequest, - PayloadAvailabilityResponse, PrimaryToPrimary, Vote, -}; - -pub struct NetworkProxy { - proposer_store: ProposerStore, -} - -#[async_trait] -impl PrimaryToPrimary for NetworkProxy { - async fn send_message( - &self, - request: anemo::Request, - ) -> Result, anemo::rpc::Status> { - unimplemented!( - "FetchCertificateProxy::send_message() is unimplemented!! {:#?}", - request - ); - } - async fn get_certificates( - &self, - _request: anemo::Request, - ) -> Result, anemo::rpc::Status> { - unimplemented!() - } - async fn fetch_certificates( - &self, - _request: anemo::Request, - ) -> Result, anemo::rpc::Status> { - unimplemented!() - } - - async fn get_payload_availability( - &self, - _request: anemo::Request, - ) -> Result, anemo::rpc::Status> { - unimplemented!() - } - - async fn get_latest_header( - &self, - _request: anemo::Request, - ) -> Result, anemo::rpc::Status> { - let latest_header = self.proposer_store.get_last_proposed().map_err(|e| { - anemo::rpc::Status::internal(format!( - "error fetching latest proposed header from store: {e}" - )) - })?; - Ok(anemo::Response::new(LatestHeaderResponse { - header: latest_header, - })) - } -} +use types::{CertificateDigest, Header, LatestHeaderResponse, MockPrimaryToPrimary, Vote}; #[tokio::test] async fn process_header() { @@ -1090,7 +1035,6 @@ async fn recover_core_latest_headers() { let worker_cache = fixture.shared_worker_cache(); let primary = fixture.authorities().last().unwrap(); let name = primary.public_key(); - let other_primary = fixture.authorities().nth(1).unwrap(); let signature_service = SignatureService::new(primary.keypair().copy()); let (_tx_reconfigure, rx_reconfigure) = @@ -1107,23 +1051,6 @@ async fn recover_core_latest_headers() { // Create test stores. let (header_store, certificates_store, payload_store) = create_db_stores(); - let proposer_store = ProposerStore::new_for_tests(); - - // add a header to store - let builder = types::HeaderBuilder::default(); - let fixture = CommitteeFixture::builder().randomize_ports(true).build(); - let primary = fixture.authorities().next().unwrap(); - let header = builder - .author(name.clone()) - .round(1) - .epoch(0) - .parents([CertificateDigest::default()].iter().cloned().collect()) - .with_payload_batch(fixture_batch_with_transactions(10), 0) - .build(primary.keypair()) - .unwrap(); - - let result = proposer_store.write_last_proposed(&header); - assert!(result.is_ok()); // Make a synchronizer for the core. let synchronizer = Synchronizer::new( @@ -1138,29 +1065,52 @@ async fn recover_core_latest_headers() { let metrics = Arc::new(PrimaryMetrics::new(&Registry::new())); - let fake_primary_addr = network::multiaddr_to_address(other_primary.address()).unwrap(); - let fake_route = - anemo::Router::new().add_rpc_service(types::PrimaryToPrimaryServer::new(NetworkProxy { - proposer_store, - })); - let fake_server_network = anemo::Network::bind(fake_primary_addr.clone()) + let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap(); + println!("New primary added: {:?}", own_address); + let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key( - other_primary - .network_keypair() - .copy() - .private() - .0 - .to_bytes(), - ) - .start(fake_route) - .unwrap(); - let client_network = test_utils::test_network(primary.network_keypair(), primary.address()); - client_network - .connect_with_peer_id(fake_primary_addr, fake_server_network.peer_id()) - .await + .private_key(primary.network_keypair().copy().private().0.to_bytes()) + .start(anemo::Router::new()) .unwrap(); + let mut primary_networks = Vec::new(); + let fixture_authorities = fixture.authorities().filter(|a| a.public_key() != name); + for primary in fixture_authorities { + let address = committee.primary(&primary.public_key()).unwrap(); + + let mut mock_server = MockPrimaryToPrimary::new(); + mock_server + .expect_get_latest_header() + .returning(move |_request| { + let builder = types::HeaderBuilder::default(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let primary = fixture.authorities().next().unwrap(); + + let header = builder + .author(primary.public_key()) + .round(1) + .epoch(0) + .parents([CertificateDigest::default()].iter().cloned().collect()) + .with_payload_batch(fixture_batch_with_transactions(10), 0) + .build(primary.keypair()) + .unwrap(); + Ok(anemo::Response::new(LatestHeaderResponse { + header: Some(header), + })) + }); + + let routes = + anemo::Router::new().add_rpc_service(types::PrimaryToPrimaryServer::new(mock_server)); + primary_networks.push(primary.new_network(routes)); + + let address = network::multiaddr_to_address(&address).unwrap(); + let peer_id = PeerId(primary.network_keypair().public().0.to_bytes()); + network + .connect_with_peer_id(address, peer_id) + .await + .unwrap(); + } + // Spawn the core. let _core_handle = Core::spawn( name.clone(), @@ -1181,7 +1131,7 @@ async fn recover_core_latest_headers() { tx_consensus, /* tx_proposer */ tx_parents, metrics.clone(), - P2pNetwork::new(client_network.clone()), + P2pNetwork::new(network.clone()), ); tokio::time::sleep(Duration::from_secs(5)).await; @@ -1189,14 +1139,14 @@ async fn recover_core_latest_headers() { // assert that the header was processed let mut m = HashMap::new(); m.insert("epoch", "0"); - m.insert("source", "own"); + m.insert("source", "other"); assert_eq!( metrics .unique_headers_received .get_metric_with(&m) .unwrap() .get(), - 1 + 3 ); } diff --git a/narwhal/test-utils/src/lib.rs b/narwhal/test-utils/src/lib.rs index 9a1043980ada8..dfe9a97d63add 100644 --- a/narwhal/test-utils/src/lib.rs +++ b/narwhal/test-utils/src/lib.rs @@ -26,12 +26,12 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tracing::info; use types::{ Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest, - FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, - HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest, - PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer, - PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest, RequestBatchResponse, Round, - SequenceNumber, Transaction, Vote, WorkerBatchMessage, WorkerDeleteBatchesMessage, - WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerServer, + FetchCertificatesResponse, Header, HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse, + PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, + PrimaryToPrimaryServer, PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest, + RequestBatchResponse, Round, SequenceNumber, Transaction, Vote, WorkerBatchMessage, + WorkerDeleteBatchesMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker, + WorkerToWorkerServer, }; pub mod cluster;