Skip to content

Commit

Permalink
review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Nov 4, 2022
1 parent 63a7fdd commit 4aba351
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 145 deletions.
9 changes: 4 additions & 5 deletions narwhal/network/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use std::collections::HashMap;
use std::time::Duration;
use tokio::{runtime::Handle, task::JoinHandle};
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse,
PrimaryMessage, PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest,
WorkerBatchMessage, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage,
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, LatestHeaderRequest,
LatestHeaderResponse, PrimaryMessage, PrimaryToPrimaryClient, PrimaryToWorkerClient,
RequestBatchRequest, WorkerBatchMessage, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage,
WorkerOurBatchMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage,
WorkerToPrimaryClient, WorkerToWorkerClient,
};
Expand Down Expand Up @@ -226,7 +225,7 @@ impl PrimaryToPrimaryRpc for anemo::Network {
async fn get_latest_header(
&self,
peer: &NetworkPublicKey,
request: LatestHeaderRequest,
request: impl anemo::types::request::IntoRequest<LatestHeaderRequest> + Send,
) -> Result<LatestHeaderResponse> {
let peer_id = PeerId(peer.0.to_bytes());
let peer = self
Expand Down
6 changes: 3 additions & 3 deletions narwhal/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crypto::NetworkPublicKey;
use rand::prelude::{SliceRandom, SmallRng};
use tokio::task::JoinHandle;
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse,
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, LatestHeaderRequest,
LatestHeaderResponse,
};

pub trait UnreliableNetwork<Request: Clone + Send + Sync> {
Expand Down Expand Up @@ -106,7 +106,7 @@ pub trait PrimaryToPrimaryRpc {
async fn get_latest_header(
&self,
peer: &NetworkPublicKey,
request: LatestHeaderRequest,
request: impl anemo::types::request::IntoRequest<LatestHeaderRequest> + Send,
) -> Result<LatestHeaderResponse>;
}

Expand Down
2 changes: 1 addition & 1 deletion narwhal/node/tests/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn run_client(
}
}
}
#[ignore]

#[tokio::test]
async fn restart() {
telemetry_subscribers::init_for_testing();
Expand Down
36 changes: 19 additions & 17 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use fastcrypto::{hash::Hash as _, SignatureService};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use network::{CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -24,7 +25,7 @@ use std::{
use storage::CertificateStore;
use store::Store;
use sui_metrics::spawn_monitored_task;
use tokio::{sync::watch, task::JoinHandle, time};
use tokio::{sync::watch, task::JoinHandle};

use tracing::{debug, error, info, instrument, trace, warn};
use types::{
Expand All @@ -39,6 +40,8 @@ use types::{
#[path = "tests/core_tests.rs"]
pub mod core_tests;

const LATEST_HEADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);

pub struct Core {
/// The public key of this primary.
name: PublicKey,
Expand Down Expand Up @@ -190,31 +193,24 @@ impl Core {
.map(|(_, _, network_key)| network_key)
.collect();

let network = P2pNetwork::new(self.network.network());
let mut header_futures = FuturesUnordered::new();
let request = LatestHeaderRequest {};

for peer in peers.iter() {
let network = network.network();
let request = request.clone();
let network = self.network.network();
let request = anemo::Request::new(LatestHeaderRequest {})
.with_timeout(LATEST_HEADER_REQUEST_TIMEOUT);

header_futures.push(async move {
let _ = &network;
network.get_latest_header(peer, request).await
});
header_futures.push(async move { network.get_latest_header(peer, request).await });
}

let mut latest_headers = Vec::new();
while let Some(res) = header_futures.next().await {
match res {
Ok(response) => {
if let Some(header) = response.header {
let result = self.process_header(&header).await;
if result.is_err() {
error!(
"error on recovery path processing latest header: {:?}",
result.err()
);
}
latest_headers.push(header);
} else {
trace!("peer's latest header was None on recovery path")
debug!("peer's latest header was None on recovery path")
}
}
Err(e) => {
Expand All @@ -226,6 +222,12 @@ impl Core {
}
}

for header in latest_headers {
if let Err(err) = self.process_header(&header).await {
error!("error on recovery path processing latest header: {:?}", err);
}
}

self
}

Expand Down
14 changes: 5 additions & 9 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,11 @@ pub use types::PrimaryMessage;
use types::{
error::DagError,
metered_channel::{channel_with_total, Receiver, Sender},
BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimary, PrimaryToPrimary,
PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToPrimaryServer,
PrimaryToPrimaryServer, ReconfigureNotification, Round, RoundVoteDigestPair,
WorkerInfoResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary,
WorkerToPrimary, WorkerToPrimary, WorkerToPrimary, WorkerToPrimaryServer,
WorkerToPrimaryServer, WorkerToPrimaryServer, WorkerToPrimaryServer,
BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, Header, HeaderDigest, LatestHeaderRequest, LatestHeaderResponse,
PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryToPrimary,
PrimaryToPrimaryServer, ReconfigureNotification, RoundVoteDigestPair, WorkerInfoResponse,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary, WorkerToPrimaryServer,
};

#[cfg(any(test))]
Expand Down
10 changes: 4 additions & 6 deletions narwhal/primary/src/tests/certificate_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::{
sync::Arc,
time::Duration,
};

use storage::{CertificateStore, ProposerStore};
use test_utils::{temp_dir, CommitteeFixture};
use tokio::{
Expand All @@ -29,11 +28,10 @@ use tokio::{
time::sleep,
};
use types::{
BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, Metadata, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer,
ReconfigureNotification, Round,
Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse,
PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary,
PrimaryToPrimaryServer, ReconfigureNotification, Round,
};

pub struct NetworkProxy {
Expand Down
146 changes: 48 additions & 98 deletions narwhal/primary/src/tests/core_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,67 +3,12 @@
// SPDX-License-Identifier: Apache-2.0
use super::*;
use crate::common::{create_db_stores, create_test_vote_store};
use anemo::{async_trait, types::PeerInfo, PeerId};
use anemo::{types::PeerInfo, PeerId};
use fastcrypto::traits::KeyPair;
use prometheus::Registry;
use storage::ProposerStore;
use test_utils::{fixture_batch_with_transactions, CommitteeFixture, PrimaryToPrimaryMockServer};
use tokio::time::Duration;
use types::{
CertificateDigest, FetchCertificatesRequest, FetchCertificatesResponse, GetCertificatesRequest,
GetCertificatesResponse, Header, LatestHeaderResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryToPrimary, Vote,
};

pub struct NetworkProxy {
proposer_store: ProposerStore,
}

#[async_trait]
impl PrimaryToPrimary for NetworkProxy {
async fn send_message(
&self,
request: anemo::Request<PrimaryMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
unimplemented!(
"FetchCertificateProxy::send_message() is unimplemented!! {:#?}",
request
);
}
async fn get_certificates(
&self,
_request: anemo::Request<GetCertificatesRequest>,
) -> Result<anemo::Response<GetCertificatesResponse>, anemo::rpc::Status> {
unimplemented!()
}
async fn fetch_certificates(
&self,
_request: anemo::Request<FetchCertificatesRequest>,
) -> Result<anemo::Response<FetchCertificatesResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_payload_availability(
&self,
_request: anemo::Request<PayloadAvailabilityRequest>,
) -> Result<anemo::Response<PayloadAvailabilityResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_latest_header(
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
let latest_header = self.proposer_store.get_last_proposed().map_err(|e| {
anemo::rpc::Status::internal(format!(
"error fetching latest proposed header from store: {e}"
))
})?;
Ok(anemo::Response::new(LatestHeaderResponse {
header: latest_header,
}))
}
}
use types::{CertificateDigest, Header, LatestHeaderResponse, MockPrimaryToPrimary, Vote};

#[tokio::test]
async fn process_header() {
Expand Down Expand Up @@ -1090,7 +1035,6 @@ async fn recover_core_latest_headers() {
let worker_cache = fixture.shared_worker_cache();
let primary = fixture.authorities().last().unwrap();
let name = primary.public_key();
let other_primary = fixture.authorities().nth(1).unwrap();
let signature_service = SignatureService::new(primary.keypair().copy());

let (_tx_reconfigure, rx_reconfigure) =
Expand All @@ -1107,23 +1051,6 @@ async fn recover_core_latest_headers() {

// Create test stores.
let (header_store, certificates_store, payload_store) = create_db_stores();
let proposer_store = ProposerStore::new_for_tests();

// add a header to store
let builder = types::HeaderBuilder::default();
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
let primary = fixture.authorities().next().unwrap();
let header = builder
.author(name.clone())
.round(1)
.epoch(0)
.parents([CertificateDigest::default()].iter().cloned().collect())
.with_payload_batch(fixture_batch_with_transactions(10), 0)
.build(primary.keypair())
.unwrap();

let result = proposer_store.write_last_proposed(&header);
assert!(result.is_ok());

// Make a synchronizer for the core.
let synchronizer = Synchronizer::new(
Expand All @@ -1138,29 +1065,52 @@ async fn recover_core_latest_headers() {

let metrics = Arc::new(PrimaryMetrics::new(&Registry::new()));

let fake_primary_addr = network::multiaddr_to_address(other_primary.address()).unwrap();
let fake_route =
anemo::Router::new().add_rpc_service(types::PrimaryToPrimaryServer::new(NetworkProxy {
proposer_store,
}));
let fake_server_network = anemo::Network::bind(fake_primary_addr.clone())
let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap()).unwrap();
println!("New primary added: {:?}", own_address);
let network = anemo::Network::bind(own_address)
.server_name("narwhal")
.private_key(
other_primary
.network_keypair()
.copy()
.private()
.0
.to_bytes(),
)
.start(fake_route)
.unwrap();
let client_network = test_utils::test_network(primary.network_keypair(), primary.address());
client_network
.connect_with_peer_id(fake_primary_addr, fake_server_network.peer_id())
.await
.private_key(primary.network_keypair().copy().private().0.to_bytes())
.start(anemo::Router::new())
.unwrap();

let mut primary_networks = Vec::new();
let fixture_authorities = fixture.authorities().filter(|a| a.public_key() != name);
for primary in fixture_authorities {
let address = committee.primary(&primary.public_key()).unwrap();

let mut mock_server = MockPrimaryToPrimary::new();
mock_server
.expect_get_latest_header()
.returning(move |_request| {
let builder = types::HeaderBuilder::default();
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
let primary = fixture.authorities().next().unwrap();

let header = builder
.author(primary.public_key())
.round(1)
.epoch(0)
.parents([CertificateDigest::default()].iter().cloned().collect())
.with_payload_batch(fixture_batch_with_transactions(10), 0)
.build(primary.keypair())
.unwrap();
Ok(anemo::Response::new(LatestHeaderResponse {
header: Some(header),
}))
});

let routes =
anemo::Router::new().add_rpc_service(types::PrimaryToPrimaryServer::new(mock_server));
primary_networks.push(primary.new_network(routes));

let address = network::multiaddr_to_address(&address).unwrap();
let peer_id = PeerId(primary.network_keypair().public().0.to_bytes());
network
.connect_with_peer_id(address, peer_id)
.await
.unwrap();
}

// Spawn the core.
let _core_handle = Core::spawn(
name.clone(),
Expand All @@ -1181,22 +1131,22 @@ async fn recover_core_latest_headers() {
tx_consensus,
/* tx_proposer */ tx_parents,
metrics.clone(),
P2pNetwork::new(client_network.clone()),
P2pNetwork::new(network.clone()),
);

tokio::time::sleep(Duration::from_secs(5)).await;

// assert that the header was processed
let mut m = HashMap::new();
m.insert("epoch", "0");
m.insert("source", "own");
m.insert("source", "other");
assert_eq!(
metrics
.unique_headers_received
.get_metric_with(&m)
.unwrap()
.get(),
1
3
);
}

Expand Down
12 changes: 6 additions & 6 deletions narwhal/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::info;
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer,
PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest, RequestBatchResponse, Round,
SequenceNumber, Transaction, Vote, WorkerBatchMessage, WorkerDeleteBatchesMessage,
WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerServer,
FetchCertificatesResponse, Header, HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse,
PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary,
PrimaryToPrimaryServer, PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest,
RequestBatchResponse, Round, SequenceNumber, Transaction, Vote, WorkerBatchMessage,
WorkerDeleteBatchesMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker,
WorkerToWorkerServer,
};

pub mod cluster;
Expand Down

0 comments on commit 4aba351

Please sign in to comment.