Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Nov 4, 2022
1 parent 4460d9e commit 63a7fdd
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 33 deletions.
2 changes: 1 addition & 1 deletion narwhal/node/tests/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn run_client(
}
}
}

#[ignore]
#[tokio::test]
async fn restart() {
telemetry_subscribers::init_for_testing();
Expand Down
41 changes: 17 additions & 24 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use fastcrypto::{hash::Hash as _, SignatureService};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use network::{CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -26,6 +25,7 @@ use storage::CertificateStore;
use store::Store;
use sui_metrics::spawn_monitored_task;
use tokio::{sync::watch, task::JoinHandle, time};

use tracing::{debug, error, info, instrument, trace, warn};
use types::{
ensure,
Expand All @@ -39,8 +39,6 @@ use types::{
#[path = "tests/core_tests.rs"]
pub mod core_tests;

const RECOVERY_REQUEST_TIMEOUT_SECS: u64 = 60;

pub struct Core {
/// The public key of this primary.
name: PublicKey,
Expand Down Expand Up @@ -192,7 +190,7 @@ impl Core {
.map(|(_, _, network_key)| network_key)
.collect();

let network = P2pNetwork::new(self.network.network().clone());
let network = P2pNetwork::new(self.network.network());
let mut header_futures = FuturesUnordered::new();
let request = LatestHeaderRequest {};
for peer in peers.iter() {
Expand All @@ -204,32 +202,27 @@ impl Core {
network.get_latest_header(peer, request).await
});
}
let request_interval = Duration::from_secs(RECOVERY_REQUEST_TIMEOUT_SECS);
let mut interval = Box::pin(time::sleep(request_interval));
loop {
tokio::select! {
res = header_futures.next() => {
match res {
Some(Ok(response)) => {
while let Some(res) = header_futures.next().await {
match res {
Ok(response) => {
if let Some(header) = response.header {
self.process_header(&header);
let result = self.process_header(&header).await;
if result.is_err() {
error!(
"error on recovery path processing latest header: {:?}",
result.err()
);
}
} else {
trace!("peer's latest header was None on recovery path")
}
}
Some(Err(e)) => {
error!(
Err(e) => {
debug!(
"failed to get latest header from peer as recovery on startup: {:?}",
e
)
}
None => {
break;
}
}
}
_ = &mut interval => {
debug!("timeout was passed when requesting recovery header from peer");
break;
}
}
}

Expand Down Expand Up @@ -742,7 +735,7 @@ impl Core {
// Cleanup internal state.
let keys = self.vote_digest_store.iter(None).await.into_keys();
if let Err(e) = self.vote_digest_store.remove_all(keys).await {
error!("Error in change epoch when clearing vote store {}", e);
error!("Error in change epoch when clearing vote store {:?}", e);
}
self.processing.clear();
self.certificates_aggregators.clear();
Expand Down
20 changes: 15 additions & 5 deletions narwhal/primary/src/tests/certificate_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::{
sync::Arc,
time::Duration,
};
use storage::CertificateStore;

use storage::{CertificateStore, ProposerStore};
use test_utils::{temp_dir, CommitteeFixture};
use tokio::{
sync::{
Expand All @@ -35,13 +36,14 @@ use types::{
ReconfigureNotification, Round,
};

struct FetchCertificateProxy {
pub struct NetworkProxy {
request: Sender<FetchCertificatesRequest>,
response: Arc<Mutex<Receiver<FetchCertificatesResponse>>>,
proposer_store: ProposerStore,
}

#[async_trait]
impl PrimaryToPrimary for FetchCertificateProxy {
impl PrimaryToPrimary for NetworkProxy {
async fn send_message(
&self,
request: anemo::Request<PrimaryMessage>,
Expand Down Expand Up @@ -75,7 +77,14 @@ impl PrimaryToPrimary for FetchCertificateProxy {
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
Ok(anemo::Response::new(LatestHeaderResponse { header: None }))
let latest_header = self.proposer_store.get_last_proposed().map_err(|e| {
anemo::rpc::Status::internal(format!(
"error fetching latest proposed header from store: {e}"
))
})?;
Ok(anemo::Response::new(LatestHeaderResponse {
header: latest_header,
}))
}
}

Expand Down Expand Up @@ -200,9 +209,10 @@ async fn fetch_certificates_basic() {

let fake_primary_addr = network::multiaddr_to_address(fake_primary.address()).unwrap();
let fake_route =
anemo::Router::new().add_rpc_service(PrimaryToPrimaryServer::new(FetchCertificateProxy {
anemo::Router::new().add_rpc_service(PrimaryToPrimaryServer::new(NetworkProxy {
request: tx_fetch_req,
response: Arc::new(Mutex::new(rx_fetch_resp)),
proposer_store: ProposerStore::new_for_tests(),
}));
let fake_server_network = anemo::Network::bind(fake_primary_addr.clone())
.server_name("narwhal")
Expand Down
176 changes: 174 additions & 2 deletions narwhal/primary/src/tests/core_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,67 @@
// SPDX-License-Identifier: Apache-2.0
use super::*;
use crate::common::{create_db_stores, create_test_vote_store};
use anemo::{types::PeerInfo, PeerId};
use anemo::{async_trait, types::PeerInfo, PeerId};
use fastcrypto::traits::KeyPair;
use prometheus::Registry;
use storage::ProposerStore;
use test_utils::{fixture_batch_with_transactions, CommitteeFixture, PrimaryToPrimaryMockServer};
use tokio::time::Duration;
use types::{CertificateDigest, Header, Vote};
use types::{
CertificateDigest, FetchCertificatesRequest, FetchCertificatesResponse, GetCertificatesRequest,
GetCertificatesResponse, Header, LatestHeaderResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryToPrimary, Vote,
};

pub struct NetworkProxy {
proposer_store: ProposerStore,
}

#[async_trait]
impl PrimaryToPrimary for NetworkProxy {
async fn send_message(
&self,
request: anemo::Request<PrimaryMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
unimplemented!(
"FetchCertificateProxy::send_message() is unimplemented!! {:#?}",
request
);
}
async fn get_certificates(
&self,
_request: anemo::Request<GetCertificatesRequest>,
) -> Result<anemo::Response<GetCertificatesResponse>, anemo::rpc::Status> {
unimplemented!()
}
async fn fetch_certificates(
&self,
_request: anemo::Request<FetchCertificatesRequest>,
) -> Result<anemo::Response<FetchCertificatesResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_payload_availability(
&self,
_request: anemo::Request<PayloadAvailabilityRequest>,
) -> Result<anemo::Response<PayloadAvailabilityResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_latest_header(
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
let latest_header = self.proposer_store.get_last_proposed().map_err(|e| {
anemo::rpc::Status::internal(format!(
"error fetching latest proposed header from store: {e}"
))
})?;
Ok(anemo::Response::new(LatestHeaderResponse {
header: latest_header,
}))
}
}

#[tokio::test]
async fn process_header() {
Expand Down Expand Up @@ -1028,6 +1083,123 @@ async fn recover_core_expecting_header_of_previous_round() {
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn recover_core_latest_headers() {
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
let committee = fixture.committee();
let worker_cache = fixture.shared_worker_cache();
let primary = fixture.authorities().last().unwrap();
let name = primary.public_key();
let other_primary = fixture.authorities().nth(1).unwrap();
let signature_service = SignatureService::new(primary.keypair().copy());

let (_tx_reconfigure, rx_reconfigure) =
watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));
let (tx_sync_headers, _rx_sync_headers) = test_utils::test_channel!(1);
let (tx_certificate_waiter, _rx_certificate_waiter) = test_utils::test_channel!(1);
let (_tx_primary_messages, rx_primary_messages) = test_utils::test_channel!(3);
let (_tx_headers_loopback, rx_headers_loopback) = test_utils::test_channel!(1);
let (_tx_certificates_loopback, rx_certificates_loopback) = test_utils::test_channel!(1);
let (_tx_headers, rx_headers) = test_utils::test_channel!(1);
let (tx_consensus, _rx_consensus) = test_utils::test_channel!(3);
let (tx_parents, _rx_parents) = test_utils::test_channel!(3);
let (_tx_consensus_round_updates, rx_consensus_round_updates) = watch::channel(0u64);

// Create test stores.
let (header_store, certificates_store, payload_store) = create_db_stores();
let proposer_store = ProposerStore::new_for_tests();

// add a header to store
let builder = types::HeaderBuilder::default();
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
let primary = fixture.authorities().next().unwrap();
let header = builder
.author(name.clone())
.round(1)
.epoch(0)
.parents([CertificateDigest::default()].iter().cloned().collect())
.with_payload_batch(fixture_batch_with_transactions(10), 0)
.build(primary.keypair())
.unwrap();

let result = proposer_store.write_last_proposed(&header);
assert!(result.is_ok());

// Make a synchronizer for the core.
let synchronizer = Synchronizer::new(
name.clone(),
&committee,
certificates_store.clone(),
payload_store.clone(),
/* tx_header_waiter */ tx_sync_headers.clone(),
tx_certificate_waiter.clone(),
None,
);

let metrics = Arc::new(PrimaryMetrics::new(&Registry::new()));

let fake_primary_addr = network::multiaddr_to_address(other_primary.address()).unwrap();
let fake_route =
anemo::Router::new().add_rpc_service(types::PrimaryToPrimaryServer::new(NetworkProxy {
proposer_store,
}));
let fake_server_network = anemo::Network::bind(fake_primary_addr.clone())
.server_name("narwhal")
.private_key(
other_primary
.network_keypair()
.copy()
.private()
.0
.to_bytes(),
)
.start(fake_route)
.unwrap();
let client_network = test_utils::test_network(primary.network_keypair(), primary.address());
client_network
.connect_with_peer_id(fake_primary_addr, fake_server_network.peer_id())
.await
.unwrap();

// Spawn the core.
let _core_handle = Core::spawn(
name.clone(),
committee.clone(),
worker_cache.clone(),
header_store.clone(),
certificates_store.clone(),
create_test_vote_store(),
synchronizer,
signature_service.clone(),
rx_consensus_round_updates.clone(),
/* gc_depth */ 50,
rx_reconfigure.clone(),
/* rx_primaries */ rx_primary_messages,
/* rx_header_waiter */ rx_headers_loopback,
/* rx_certificate_waiter */ rx_certificates_loopback,
/* rx_proposer */ rx_headers,
tx_consensus,
/* tx_proposer */ tx_parents,
metrics.clone(),
P2pNetwork::new(client_network.clone()),
);

tokio::time::sleep(Duration::from_secs(5)).await;

// assert that the header was processed
let mut m = HashMap::new();
m.insert("epoch", "0");
m.insert("source", "own");
assert_eq!(
metrics
.unique_headers_received
.get_metric_with(&m)
.unwrap()
.get(),
1
);
}

#[tokio::test]
async fn shutdown_core() {
let fixture = CommitteeFixture::builder().build();
Expand Down
2 changes: 1 addition & 1 deletion narwhal/storage/src/proposer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ mod test {
use test_utils::{fixture_batch_with_transactions, CommitteeFixture};
use types::{CertificateDigest, Header, Round};

fn create_header_for_round(round: Round) -> Header {
pub fn create_header_for_round(round: Round) -> Header {
let builder = types::HeaderBuilder::default();
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
let primary = fixture.authorities().next().unwrap();
Expand Down

0 comments on commit 63a7fdd

Please sign in to comment.