Skip to content

Commit

Permalink
Make a request to the other nodes to get latest headers on recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Nov 4, 2022
1 parent b556120 commit 4460d9e
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 30 deletions.
11 changes: 5 additions & 6 deletions narwhal/network/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
traits::{Lucky, ReliableNetwork, UnreliableNetwork},
BoundedExecutor, CancelOnDropHandler, RetryConfig, MAX_TASK_CONCURRENCY,
};
use anemo::{Peer, PeerId, Response};
use anemo::PeerId;
use anyhow::format_err;
use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -18,11 +18,10 @@ use tokio::{runtime::Handle, task::JoinHandle};
use types::{
Batch, BatchDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, LatestHeaderRequest, LatestHeaderResponse,
PrimaryMessage, PrimaryToPrimaryClient, PrimaryToPrimaryClient, PrimaryToWorkerClient,
PrimaryToWorkerClient, RequestBatchRequest, WorkerBatchMessage, WorkerDeleteBatchesMessage,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerReconfigureMessage,
WorkerSynchronizeMessage, WorkerToPrimaryClient, WorkerToPrimaryClient, WorkerToWorkerClient,
WorkerToWorkerClient,
PrimaryMessage, PrimaryToPrimaryClient, PrimaryToWorkerClient, RequestBatchRequest,
WorkerBatchMessage, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage,
WorkerOurBatchMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage,
WorkerToPrimaryClient, WorkerToWorkerClient,
};

fn default_executor() -> BoundedExecutor {
Expand Down
10 changes: 4 additions & 6 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ use crypto::{NetworkPublicKey, PublicKey, Signature};
use fastcrypto::{hash::Hash as _, SignatureService};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use network::{
CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork, UnreliableNetwork,
};
use network::{CancelOnDropHandler, P2pNetwork, PrimaryToPrimaryRpc, ReliableNetwork};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
Expand All @@ -33,8 +31,8 @@ use types::{
ensure,
error::{DagError, DagError::StoreError, DagResult},
metered_channel::{Receiver, Sender},
Certificate, Header, HeaderDigest, LatestHeaderRequest, PrimaryToPrimaryClient,
ReconfigureNotification, Round, RoundVoteDigestPair, Timestamp, Vote,
Certificate, Header, HeaderDigest, LatestHeaderRequest, ReconfigureNotification, Round,
RoundVoteDigestPair, Timestamp, Vote,
};

#[cfg(test)]
Expand Down Expand Up @@ -207,7 +205,7 @@ impl Core {
});
}
let request_interval = Duration::from_secs(RECOVERY_REQUEST_TIMEOUT_SECS);
let interval = Box::pin(time::sleep(request_interval));
let mut interval = Box::pin(time::sleep(request_interval));
loop {
tokio::select! {
res = header_futures.next() => {
Expand Down
9 changes: 5 additions & 4 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ use types::{
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryToPrimary, PrimaryToPrimary, PrimaryToPrimary,
PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToPrimaryServer,
ReconfigureNotification, Round, RoundVoteDigestPair, WorkerInfoResponse,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary, WorkerToPrimary,
WorkerToPrimary, WorkerToPrimaryServer, WorkerToPrimaryServer, WorkerToPrimaryServer,
PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToPrimaryServer,
PrimaryToPrimaryServer, ReconfigureNotification, Round, RoundVoteDigestPair,
WorkerInfoResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary,
WorkerToPrimary, WorkerToPrimary, WorkerToPrimary, WorkerToPrimaryServer,
WorkerToPrimaryServer, WorkerToPrimaryServer, WorkerToPrimaryServer,
};

#[cfg(any(test))]
Expand Down
3 changes: 1 addition & 2 deletions narwhal/primary/src/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use tracing::{debug, info};
use types::{
error::{DagError, DagResult},
metered_channel::{Receiver, Sender},
BatchDigest, Certificate, Header, PrimaryMessage, ReconfigureNotification, Round, Timestamp,
TimestampMs,
BatchDigest, Certificate, Header, ReconfigureNotification, Round, Timestamp, TimestampMs,
};

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions narwhal/primary/src/tests/certificate_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use types::{
BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, LatestHeaderRequest, LatestHeaderResponse, Metadata, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimary,
PrimaryToPrimaryServer, PrimaryToPrimaryServer, ReconfigureNotification, Round,
PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer,
ReconfigureNotification, Round,
};

struct FetchCertificateProxy {
Expand Down Expand Up @@ -75,7 +75,7 @@ impl PrimaryToPrimary for FetchCertificateProxy {
&self,
_request: anemo::Request<LatestHeaderRequest>,
) -> Result<anemo::Response<LatestHeaderResponse>, anemo::rpc::Status> {
unimplemented!()
Ok(anemo::Response::new(LatestHeaderResponse { header: None }))
}
}

Expand Down
1 change: 0 additions & 1 deletion narwhal/primary/src/tests/proposer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ async fn equivocation_protection() {

// Ensure the proposer makes a correct header from the provided payload.
let header = rx_headers.recv().await.unwrap();
assert_eq!(header.round, 2);
assert_eq!(header.payload.get(&digest), Some(&worker_id));
assert!(header.verify(&committee, shared_worker_cache).is_ok());

Expand Down
12 changes: 5 additions & 7 deletions narwhal/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ use tracing::info;
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderBuilder, HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse,
PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary,
PrimaryToPrimary, PrimaryToPrimaryServer, PrimaryToPrimaryServer, PrimaryToWorker,
PrimaryToWorker, PrimaryToWorkerServer, PrimaryToWorkerServer, RequestBatchRequest,
RequestBatchResponse, Round, SequenceNumber, Transaction, Vote, WorkerBatchMessage,
WorkerDeleteBatchesMessage, WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker,
WorkerToWorker, WorkerToWorkerServer, WorkerToWorkerServer,
HeaderBuilder, LatestHeaderRequest, LatestHeaderResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer,
PrimaryToWorker, PrimaryToWorkerServer, RequestBatchRequest, RequestBatchResponse, Round,
SequenceNumber, Transaction, Vote, WorkerBatchMessage, WorkerDeleteBatchesMessage,
WorkerReconfigureMessage, WorkerSynchronizeMessage, WorkerToWorker, WorkerToWorkerServer,
};

pub mod cluster;
Expand Down
1 change: 0 additions & 1 deletion narwhal/types/build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use rustversion::{beta, nightly, stable};
use std::{
env,
path::{Path, PathBuf},
Expand Down

0 comments on commit 4460d9e

Please sign in to comment.