From b55612084c4f90c612e40c62efb7ee6d886751a8 Mon Sep 17 00:00:00 2001 From: Eugene Boguslavsky Date: Tue, 1 Nov 2022 13:37:52 -0700 Subject: [PATCH] Sui v0.15.0 version bump (#5765) --- narwhal/network/src/p2p.rs | 28 ++++++-- narwhal/network/src/traits.rs | 11 ++- narwhal/primary/src/core.rs | 70 ++++++++++++++++--- narwhal/primary/src/primary.rs | 29 ++++++-- narwhal/primary/src/proposer.rs | 3 +- .../src/tests/certificate_waiter_tests.rs | 16 ++++- narwhal/primary/src/tests/primary_tests.rs | 5 +- narwhal/test-utils/src/lib.rs | 21 ++++-- narwhal/types/build.rs | 10 +++ narwhal/types/src/primary.rs | 8 +++ 10 files changed, 172 insertions(+), 29 deletions(-) diff --git a/narwhal/network/src/p2p.rs b/narwhal/network/src/p2p.rs index f02be605a5bfd..2bf2442b5962f 100644 --- a/narwhal/network/src/p2p.rs +++ b/narwhal/network/src/p2p.rs @@ -6,7 +6,7 @@ use crate::{ traits::{Lucky, ReliableNetwork, UnreliableNetwork}, BoundedExecutor, CancelOnDropHandler, RetryConfig, MAX_TASK_CONCURRENCY, }; -use anemo::PeerId; +use anemo::{Peer, PeerId, Response}; use anyhow::format_err; use anyhow::Result; use async_trait::async_trait; @@ -16,10 +16,12 @@ use std::collections::HashMap; use std::time::Duration; use tokio::{runtime::Handle, task::JoinHandle}; use types::{ - Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, PrimaryMessage, - PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchMessage, - WorkerDeleteBatchesMessage, WorkerOthersBatchMessage, WorkerOurBatchMessage, - WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToPrimaryClient, + Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, + GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse, + PrimaryMessage, PrimaryToPrimaryClient, PrimaryToPrimaryClient, PrimaryToWorkerClient, + PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchMessage, WorkerDeleteBatchesMessage, + WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerReconfigureMessage, + WorkerSynchronizeMessage, WorkerToPrimaryClient, WorkerToPrimaryClient, WorkerToWorkerClient, WorkerToWorkerClient, }; @@ -221,6 +223,22 @@ impl PrimaryToPrimaryRpc for anemo::Network { .map_err(|e| format_err!("Network error {:?}", e))?; Ok(response.into_body()) } + + async fn get_latest_header( + &self, + peer: &NetworkPublicKey, + request: LatestHeaderRequest, + ) -> Result { + let peer_id = PeerId(peer.0.to_bytes()); + let peer = self + .peer(peer_id) + .ok_or_else(|| format_err!("Network has no connection with peer {peer_id}"))?; + let response = PrimaryToPrimaryClient::new(peer) + .get_latest_header(request) + .await + .map_err(|e| format_err!("Network error {:?}", e))?; + Ok(response.into_body()) + } } // diff --git a/narwhal/network/src/traits.rs b/narwhal/network/src/traits.rs index c14eb49383a5f..cb7e77c007960 100644 --- a/narwhal/network/src/traits.rs +++ b/narwhal/network/src/traits.rs @@ -6,7 +6,10 @@ use async_trait::async_trait; use crypto::NetworkPublicKey; use rand::prelude::{SliceRandom, SmallRng}; use tokio::task::JoinHandle; -use types::{Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse}; +use types::{ + Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, + GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse, +}; pub trait UnreliableNetwork { type Response: Clone + Send + Sync; @@ -99,6 +102,12 @@ pub trait PrimaryToPrimaryRpc { peer: &NetworkPublicKey, request: FetchCertificatesRequest, ) -> Result; + + async fn get_latest_header( + &self, + peer: &NetworkPublicKey, + request: LatestHeaderRequest, + ) -> Result; } #[async_trait] diff --git a/narwhal/primary/src/core.rs b/narwhal/primary/src/core.rs index 689004e585610..6ed8788ed9876 100644 --- a/narwhal/primary/src/core.rs +++ b/narwhal/primary/src/core.rs @@ -8,11 +8,17 @@ use crate::{ primary::PrimaryMessage, synchronizer::Synchronizer, }; + use async_recursion::async_recursion; use config::{Committee, Epoch, SharedWorkerCache}; -use crypto::{PublicKey, Signature}; +use crypto::{NetworkPublicKey, PublicKey, Signature}; use fastcrypto::{hash::Hash as _, SignatureService}; -use network::{CancelOnDropHandler, P2pNetwork, ReliableNetwork}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use network::{ + CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork, UnreliableNetwork, +}; +use std::time::Duration; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -21,23 +27,21 @@ use std::{ use storage::CertificateStore; use store::Store; use sui_metrics::spawn_monitored_task; -use tokio::{sync::watch, task::JoinHandle}; +use tokio::{sync::watch, task::JoinHandle, time}; use tracing::{debug, error, info, instrument, trace, warn}; use types::{ ensure, error::{DagError, DagError::StoreError, DagResult}, metered_channel::{Receiver, Sender}, - Certificate, Header, HeaderDigest, ReconfigureNotification, Round, RoundVoteDigestPair, - Timestamp, Vote, + Certificate, Header, HeaderDigest, LatestHeaderRequest, PrimaryToPrimaryClient, + ReconfigureNotification, Round, RoundVoteDigestPair, Timestamp, Vote, }; #[cfg(test)] #[path = "tests/core_tests.rs"] pub mod core_tests; -// TODO: enable below. -// Rejects a header if it requires catching up the following number of rounds. -// const MAX_HEADER_ROUND_CATCHUP_THRESHOLD: u64 = 20; +const RECOVERY_REQUEST_TIMEOUT_SECS: u64 = 60; pub struct Core { /// The public key of this primary. @@ -181,6 +185,56 @@ impl Core { self.highest_received_round = last_round_number; self.highest_processed_round = last_round_number; + + // Get latest header from all peers and process it. + let peers: Vec = self + .committee + .others_primaries(&self.name) + .into_iter() + .map(|(_, _, network_key)| network_key) + .collect(); + + let network = P2pNetwork::new(self.network.network().clone()); + let mut header_futures = FuturesUnordered::new(); + let request = LatestHeaderRequest {}; + for peer in peers.iter() { + let network = network.network(); + let request = request.clone(); + + header_futures.push(async move { + let _ = &network; + network.get_latest_header(peer, request).await + }); + } + let request_interval = Duration::from_secs(RECOVERY_REQUEST_TIMEOUT_SECS); + let interval = Box::pin(time::sleep(request_interval)); + loop { + tokio::select! { + res = header_futures.next() => { + match res { + Some(Ok(response)) => { + if let Some(header) = response.header { + self.process_header(&header); + } + } + Some(Err(e)) => { + error!( + "failed to get latest header from peer as recovery on startup: {:?}", + e + ) + } + None => { + break; + } + } + } + _ = &mut interval => { + debug!("timeout was passed when requesting recovery header from peer"); + break; + } + } + } + self } diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index a3ff4676ad637..d958ff0953afc 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -52,11 +52,14 @@ pub use types::PrimaryMessage; use types::{ error::DagError, metered_channel::{channel_with_total, Receiver, Sender}, - BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest, - FetchCertificatesResponse, Header, HeaderDigest, PayloadAvailabilityRequest, - PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimaryServer, ReconfigureNotification, - RoundVoteDigestPair, WorkerInfoResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage, - WorkerToPrimary, WorkerToPrimaryServer, + BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest, + FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, + HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest, + PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimary, PrimaryToPrimary, + PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToPrimaryServer, + ReconfigureNotification, Round, RoundVoteDigestPair, WorkerInfoResponse, + WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary, WorkerToPrimary, + WorkerToPrimary, WorkerToPrimaryServer, WorkerToPrimaryServer, WorkerToPrimaryServer, }; #[cfg(any(test))] @@ -214,6 +217,7 @@ impl Primary { tx_availability_responses, certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), + proposer_store: proposer_store.clone(), }); let worker_service = WorkerToPrimaryServer::new(WorkerReceiverHandler { tx_our_digests, @@ -552,6 +556,7 @@ struct PrimaryReceiverHandler { tx_availability_responses: Sender, certificate_store: CertificateStore, payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + proposer_store: ProposerStore, } #[async_trait] @@ -688,6 +693,20 @@ impl PrimaryToPrimary for PrimaryReceiverHandler { payload_availability: result, })) } + + async fn get_latest_header( + &self, + _request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + let latest_header = self.proposer_store.get_last_proposed().map_err(|e| { + anemo::rpc::Status::internal(format!( + "error fetching latest proposed header from store: {e}" + )) + })?; + Ok(anemo::Response::new(LatestHeaderResponse { + header: latest_header, + })) + } } /// Defines how the network receiver handles incoming workers messages. diff --git a/narwhal/primary/src/proposer.rs b/narwhal/primary/src/proposer.rs index b4eec0423a428..762011d575b39 100644 --- a/narwhal/primary/src/proposer.rs +++ b/narwhal/primary/src/proposer.rs @@ -18,7 +18,8 @@ use tracing::{debug, info}; use types::{ error::{DagError, DagResult}, metered_channel::{Receiver, Sender}, - BatchDigest, Certificate, Header, ReconfigureNotification, Round, Timestamp, TimestampMs, + BatchDigest, Certificate, Header, PrimaryMessage, ReconfigureNotification, Round, Timestamp, + TimestampMs, }; #[cfg(test)] diff --git a/narwhal/primary/src/tests/certificate_waiter_tests.rs b/narwhal/primary/src/tests/certificate_waiter_tests.rs index 544c14db117a6..9c1976eb42ddd 100644 --- a/narwhal/primary/src/tests/certificate_waiter_tests.rs +++ b/narwhal/primary/src/tests/certificate_waiter_tests.rs @@ -28,9 +28,11 @@ use tokio::{ time::sleep, }; use types::{ - Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest, - FetchCertificatesResponse, PayloadAvailabilityRequest, PayloadAvailabilityResponse, - PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer, ReconfigureNotification, Round, + BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest, + FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, + HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, Metadata, PayloadAvailabilityRequest, + PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimary, + PrimaryToPrimaryServer, PrimaryToPrimaryServer, ReconfigureNotification, Round, }; struct FetchCertificateProxy { @@ -61,12 +63,20 @@ impl PrimaryToPrimary for FetchCertificateProxy { self.response.lock().await.recv().await.unwrap(), )) } + async fn get_payload_availability( &self, _request: anemo::Request, ) -> Result, anemo::rpc::Status> { unimplemented!() } + + async fn get_latest_header( + &self, + _request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + unimplemented!() + } } // Simulate consensus committing all certificates written to store, by updating last committed diff --git a/narwhal/primary/src/tests/primary_tests.rs b/narwhal/primary/src/tests/primary_tests.rs index 8459b6c167990..e7c15dd4665f7 100644 --- a/narwhal/primary/src/tests/primary_tests.rs +++ b/narwhal/primary/src/tests/primary_tests.rs @@ -18,7 +18,7 @@ use std::{ sync::Arc, time::Duration, }; -use storage::CertificateStore; +use storage::{CertificateStore, ProposerStore}; use store::rocks::DBMap; use store::Store; use test_utils::{temp_dir, CommitteeFixture}; @@ -274,6 +274,7 @@ async fn test_fetch_certificates_handler() { tx_availability_responses, certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), + proposer_store: ProposerStore::new_for_tests(), }; let mut current_round: Vec<_> = Certificate::genesis(&committee) @@ -374,6 +375,7 @@ async fn test_process_payload_availability_success() { tx_availability_responses, certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), + proposer_store: ProposerStore::new_for_tests(), }; // GIVEN some mock certificates @@ -487,6 +489,7 @@ async fn test_process_payload_availability_when_failures() { tx_availability_responses, certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), + proposer_store: ProposerStore::new_for_tests(), }; // AND some mock certificates diff --git a/narwhal/test-utils/src/lib.rs b/narwhal/test-utils/src/lib.rs index 771d41568fad5..c127a0f4c734a 100644 --- a/narwhal/test-utils/src/lib.rs +++ b/narwhal/test-utils/src/lib.rs @@ -26,11 +26,14 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tracing::info; use types::{ Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest, - FetchCertificatesResponse, Header, HeaderBuilder, PayloadAvailabilityRequest, - PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer, - PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest, RequestBatchResponse, Round, - SequenceNumber, Transaction, Vote, WorkerBatchMessage, WorkerDeleteBatchesMessage, - WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerServer, + FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, + HeaderBuilder, HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse, + PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, + PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToWorker, + PrimaryToWorker, PrimaryToWorkerServer, PrimaryToWorkerServer, RequestBatchRequest, + RequestBatchResponse, Round, SequenceNumber, Transaction, Vote, WorkerBatchMessage, + WorkerDeleteBatchesMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker, + WorkerToWorker, WorkerToWorkerServer, WorkerToWorkerServer, }; pub mod cluster; @@ -203,12 +206,20 @@ impl PrimaryToPrimary for PrimaryToPrimaryMockServer { ) -> Result, anemo::rpc::Status> { unimplemented!() } + async fn get_payload_availability( &self, _request: anemo::Request, ) -> Result, anemo::rpc::Status> { unimplemented!() } + + async fn get_latest_header( + &self, + _request: anemo::Request, + ) -> Result, anemo::rpc::Status> { + unimplemented!() + } } pub struct PrimaryToWorkerMockServer { diff --git a/narwhal/types/build.rs b/narwhal/types/build.rs index d84253587f773..824dab391c434 100644 --- a/narwhal/types/build.rs +++ b/narwhal/types/build.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use rustversion::{beta, nightly, stable}; use std::{ env, path::{Path, PathBuf}, @@ -77,6 +78,15 @@ fn build_anemo_services(out_dir: &Path) { .codec_path("anemo::rpc::codec::BincodeCodec") .build(), ) + .method( + anemo_build::manual::Method::builder() + .name("get_latest_header") + .route_name("GetLatestHeader") + .request_type("crate::LatestHeaderRequest") + .response_type("crate::LatestHeaderResponse") + .codec_path("anemo::rpc::codec::BincodeCodec") + .build(), + ) .build(); let primary_to_worker = anemo_build::manual::Service::builder() diff --git a/narwhal/types/src/primary.rs b/narwhal/types/src/primary.rs index fb2800aea7aab..38aed9a403b44 100644 --- a/narwhal/types/src/primary.rs +++ b/narwhal/types/src/primary.rs @@ -771,6 +771,14 @@ impl PayloadAvailabilityResponse { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LatestHeaderRequest {} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LatestHeaderResponse { + pub header: Option
, +} + /// Message to reconfigure worker tasks. This message must be sent by a trusted source. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum ReconfigureNotification {