Skip to content

Commit

Permalink
Sui v0.15.0 version bump (#5765)
Browse files Browse the repository at this point in the history
  • Loading branch information
ebmifa authored and lanvidr committed Nov 4, 2022
1 parent 402e090 commit b556120
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 29 deletions.
28 changes: 23 additions & 5 deletions narwhal/network/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
traits::{Lucky, ReliableNetwork, UnreliableNetwork},
BoundedExecutor, CancelOnDropHandler, RetryConfig, MAX_TASK_CONCURRENCY,
};
use anemo::PeerId;
use anemo::{Peer, PeerId, Response};
use anyhow::format_err;
use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -16,10 +16,12 @@ use std::collections::HashMap;
use std::time::Duration;
use tokio::{runtime::Handle, task::JoinHandle};
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse, PrimaryMessage,
PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchMessage,
WorkerDeleteBatchesMessage, WorkerOthersBatchMessage, WorkerOurBatchMessage,
WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToPrimaryClient,
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse,
PrimaryMessage, PrimaryToPrimaryClient, PrimaryToPrimaryClient, PrimaryToWorkerClient,
PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchMessage, WorkerDeleteBatchesMessage,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerReconfigureMessage,
WorkerSynchronizeMessage, WorkerToPrimaryClient, WorkerToPrimaryClient, WorkerToWorkerClient,
WorkerToWorkerClient,
};

Expand Down Expand Up @@ -221,6 +223,22 @@ impl PrimaryToPrimaryRpc for anemo::Network {
.map_err(|e| format_err!("Network error {:?}", e))?;
Ok(response.into_body())
}

async fn get_latest_header(
&self,
peer: &NetworkPublicKey,
request: LatestHeaderRequest,
) -> Result<LatestHeaderResponse> {
let peer_id = PeerId(peer.0.to_bytes());
let peer = self
.peer(peer_id)
.ok_or_else(|| format_err!("Network has no connection with peer {peer_id}"))?;
let response = PrimaryToPrimaryClient::new(peer)
.get_latest_header(request)
.await
.map_err(|e| format_err!("Network error {:?}", e))?;
Ok(response.into_body())
}
}

//
Expand Down
11 changes: 10 additions & 1 deletion narwhal/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use async_trait::async_trait;
use crypto::NetworkPublicKey;
use rand::prelude::{SliceRandom, SmallRng};
use tokio::task::JoinHandle;
use types::{Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse};
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse,
};

pub trait UnreliableNetwork<Request: Clone + Send + Sync> {
type Response: Clone + Send + Sync;
Expand Down Expand Up @@ -99,6 +102,12 @@ pub trait PrimaryToPrimaryRpc {
peer: &NetworkPublicKey,
request: FetchCertificatesRequest,
) -> Result<FetchCertificatesResponse>;

async fn get_latest_header(
&self,
peer: &NetworkPublicKey,
request: LatestHeaderRequest,
) -> Result<LatestHeaderResponse>;
}

#[async_trait]
Expand Down
70 changes: 62 additions & 8 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ use crate::{
primary::PrimaryMessage,
synchronizer::Synchronizer,
};

use async_recursion::async_recursion;
use config::{Committee, Epoch, SharedWorkerCache};
use crypto::{PublicKey, Signature};
use crypto::{NetworkPublicKey, PublicKey, Signature};
use fastcrypto::{hash::Hash as _, SignatureService};
use network::{CancelOnDropHandler, P2pNetwork, ReliableNetwork};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use network::{
CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork, UnreliableNetwork,
};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -21,23 +27,21 @@ use std::{
use storage::CertificateStore;
use store::Store;
use sui_metrics::spawn_monitored_task;
use tokio::{sync::watch, task::JoinHandle};
use tokio::{sync::watch, task::JoinHandle, time};
use tracing::{debug, error, info, instrument, trace, warn};
use types::{
ensure,
error::{DagError, DagError::StoreError, DagResult},
metered_channel::{Receiver, Sender},
Certificate, Header, HeaderDigest, ReconfigureNotification, Round, RoundVoteDigestPair,
Timestamp, Vote,
Certificate, Header, HeaderDigest, LatestHeaderRequest, PrimaryToPrimaryClient,
ReconfigureNotification, Round, RoundVoteDigestPair, Timestamp, Vote,
};

#[cfg(test)]
#[path = "tests/core_tests.rs"]
pub mod core_tests;

// TODO: enable below.
// Rejects a header if it requires catching up the following number of rounds.
// const MAX_HEADER_ROUND_CATCHUP_THRESHOLD: u64 = 20;
const RECOVERY_REQUEST_TIMEOUT_SECS: u64 = 60;

pub struct Core {
/// The public key of this primary.
Expand Down Expand Up @@ -181,6 +185,56 @@ impl Core {

self.highest_received_round = last_round_number;
self.highest_processed_round = last_round_number;

// Get latest header from all peers and process it.
let peers: Vec<NetworkPublicKey> = self
.committee
.others_primaries(&self.name)
.into_iter()
.map(|(_, _, network_key)| network_key)
.collect();

let network = P2pNetwork::new(self.network.network().clone());
let mut header_futures = FuturesUnordered::new();
let request = LatestHeaderRequest {};
for peer in peers.iter() {
let network = network.network();
let request = request.clone();

header_futures.push(async move {
let _ = &network;
network.get_latest_header(peer, request).await
});
}
let request_interval = Duration::from_secs(RECOVERY_REQUEST_TIMEOUT_SECS);
let interval = Box::pin(time::sleep(request_interval));
loop {
tokio::select! {
res = header_futures.next() => {
match res {
Some(Ok(response)) => {
if let Some(header) = response.header {
self.process_header(&header);
}
}
Some(Err(e)) => {
error!(
"failed to get latest header from peer as recovery on startup: {:?}",
e
)
}
None => {
break;
}
}
}
_ = &mut interval => {
debug!("timeout was passed when requesting recovery header from peer");
break;
}
}
}

self
}

Expand Down
29 changes: 24 additions & 5 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ pub use types::PrimaryMessage;
use types::{
error::DagError,
metered_channel::{channel_with_total, Receiver, Sender},
BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, Header, HeaderDigest, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimaryServer, ReconfigureNotification,
RoundVoteDigestPair, WorkerInfoResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage,
WorkerToPrimary, WorkerToPrimaryServer,
BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimary, PrimaryToPrimary,
PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToPrimaryServer,
ReconfigureNotification, Round, RoundVoteDigestPair, WorkerInfoResponse,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary, WorkerToPrimary,
WorkerToPrimary, WorkerToPrimaryServer, WorkerToPrimaryServer, WorkerToPrimaryServer,
};

#[cfg(any(test))]
Expand Down Expand Up @@ -214,6 +217,7 @@ impl Primary {
tx_availability_responses,
certificate_store: certificate_store.clone(),
payload_store: payload_store.clone(),
proposer_store: proposer_store.clone(),
});
let worker_service = WorkerToPrimaryServer::new(WorkerReceiverHandler {
tx_our_digests,
Expand Down Expand Up @@ -552,6 +556,7 @@ struct PrimaryReceiverHandler {
tx_availability_responses: Sender<AvailabilityResponse>,
certificate_store: CertificateStore,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
proposer_store: ProposerStore,
}

#[async_trait]
Expand Down Expand Up @@ -688,6 +693,20 @@ impl PrimaryToPrimary for PrimaryReceiverHandler {
payload_availability: result,
}))
}

async fn get_latest_header(
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
let latest_header = self.proposer_store.get_last_proposed().map_err(|e| {
anemo::rpc::Status::internal(format!(
"error fetching latest proposed header from store: {e}"
))
})?;
Ok(anemo::Response::new(LatestHeaderResponse {
header: latest_header,
}))
}
}

/// Defines how the network receiver handles incoming workers messages.
Expand Down
3 changes: 2 additions & 1 deletion narwhal/primary/src/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use tracing::{debug, info};
use types::{
error::{DagError, DagResult},
metered_channel::{Receiver, Sender},
BatchDigest, Certificate, Header, ReconfigureNotification, Round, Timestamp, TimestampMs,
BatchDigest, Certificate, Header, PrimaryMessage, ReconfigureNotification, Round, Timestamp,
TimestampMs,
};

#[cfg(test)]
Expand Down
16 changes: 13 additions & 3 deletions narwhal/primary/src/tests/certificate_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ use tokio::{
time::sleep,
};
use types::{
Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, PayloadAvailabilityRequest, PayloadAvailabilityResponse,
PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer, ReconfigureNotification, Round,
BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, Metadata, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimary,
PrimaryToPrimaryServer, PrimaryToPrimaryServer, ReconfigureNotification, Round,
};

struct FetchCertificateProxy {
Expand Down Expand Up @@ -61,12 +63,20 @@ impl PrimaryToPrimary for FetchCertificateProxy {
self.response.lock().await.recv().await.unwrap(),
))
}

async fn get_payload_availability(
&self,
_request: anemo::Request<PayloadAvailabilityRequest>,
) -> Result<anemo::Response<PayloadAvailabilityResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_latest_header(
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
unimplemented!()
}
}

// Simulate consensus committing all certificates written to store, by updating last committed
Expand Down
5 changes: 4 additions & 1 deletion narwhal/primary/src/tests/primary_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
sync::Arc,
time::Duration,
};
use storage::CertificateStore;
use storage::{CertificateStore, ProposerStore};
use store::rocks::DBMap;
use store::Store;
use test_utils::{temp_dir, CommitteeFixture};
Expand Down Expand Up @@ -274,6 +274,7 @@ async fn test_fetch_certificates_handler() {
tx_availability_responses,
certificate_store: certificate_store.clone(),
payload_store: payload_store.clone(),
proposer_store: ProposerStore::new_for_tests(),
};

let mut current_round: Vec<_> = Certificate::genesis(&committee)
Expand Down Expand Up @@ -374,6 +375,7 @@ async fn test_process_payload_availability_success() {
tx_availability_responses,
certificate_store: certificate_store.clone(),
payload_store: payload_store.clone(),
proposer_store: ProposerStore::new_for_tests(),
};

// GIVEN some mock certificates
Expand Down Expand Up @@ -487,6 +489,7 @@ async fn test_process_payload_availability_when_failures() {
tx_availability_responses,
certificate_store: certificate_store.clone(),
payload_store: payload_store.clone(),
proposer_store: ProposerStore::new_for_tests(),
};

// AND some mock certificates
Expand Down
21 changes: 16 additions & 5 deletions narwhal/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::info;
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, Header, HeaderBuilder, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer,
PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest, RequestBatchResponse, Round,
SequenceNumber, Transaction, Vote, WorkerBatchMessage, WorkerDeleteBatchesMessage,
WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerServer,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderBuilder, HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse,
PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary,
PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToWorker,
PrimaryToWorker, PrimaryToWorkerServer, PrimaryToWorkerServer, RequestBatchRequest,
RequestBatchResponse, Round, SequenceNumber, Transaction, Vote, WorkerBatchMessage,
WorkerDeleteBatchesMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker,
WorkerToWorker, WorkerToWorkerServer, WorkerToWorkerServer,
};

pub mod cluster;
Expand Down Expand Up @@ -203,12 +206,20 @@ impl PrimaryToPrimary for PrimaryToPrimaryMockServer {
) -> Result<anemo::Response<FetchCertificatesResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_payload_availability(
&self,
_request: anemo::Request<PayloadAvailabilityRequest>,
) -> Result<anemo::Response<PayloadAvailabilityResponse>, anemo::rpc::Status> {
unimplemented!()
}

async fn get_latest_header(
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
unimplemented!()
}
}

pub struct PrimaryToWorkerMockServer {
Expand Down
10 changes: 10 additions & 0 deletions narwhal/types/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use rustversion::{beta, nightly, stable};
use std::{
env,
path::{Path, PathBuf},
Expand Down Expand Up @@ -77,6 +78,15 @@ fn build_anemo_services(out_dir: &Path) {
.codec_path("anemo::rpc::codec::BincodeCodec")
.build(),
)
.method(
anemo_build::manual::Method::builder()
.name("get_latest_header")
.route_name("GetLatestHeader")
.request_type("crate::LatestHeaderRequest")
.response_type("crate::LatestHeaderResponse")
.codec_path("anemo::rpc::codec::BincodeCodec")
.build(),
)
.build();

let primary_to_worker = anemo_build::manual::Service::builder()
Expand Down
8 changes: 8 additions & 0 deletions narwhal/types/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,14 @@ impl PayloadAvailabilityResponse {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LatestHeaderRequest {}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LatestHeaderResponse {
pub header: Option<Header>,
}

/// Message to reconfigure worker tasks. This message must be sent by a trusted source.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum ReconfigureNotification {
Expand Down

0 comments on commit b556120

Please sign in to comment.