diff --git a/config/src/config/consensus_config.rs b/config/src/config/consensus_config.rs index 021edf0b365b4..56b432a7f28b7 100644 --- a/config/src/config/consensus_config.rs +++ b/config/src/config/consensus_config.rs @@ -89,8 +89,8 @@ pub struct ConsensusConfig { pub rand_rb_config: ReliableBroadcastConfig, pub num_bounded_executor_tasks: u64, pub enable_pre_commit: bool, - pub max_pending_rounds_in_commit_vote_cache: u64, + pub enable_round_timeout_msg: bool, } /// Deprecated @@ -320,6 +320,7 @@ impl Default for ConsensusConfig { num_bounded_executor_tasks: 16, enable_pre_commit: true, max_pending_rounds_in_commit_vote_cache: 100, + enable_round_timeout_msg: false, } } } diff --git a/consensus/consensus-types/src/lib.rs b/consensus/consensus-types/src/lib.rs index bc70a1ad942f2..5d351d279bf8e 100644 --- a/consensus/consensus-types/src/lib.rs +++ b/consensus/consensus-types/src/lib.rs @@ -22,6 +22,7 @@ pub mod proposal_msg; pub mod quorum_cert; pub mod randomness; pub mod request_response; +pub mod round_timeout; pub mod safety_data; pub mod sync_info; pub mod timeout_2chain; diff --git a/consensus/consensus-types/src/round_timeout.rs b/consensus/consensus-types/src/round_timeout.rs new file mode 100644 index 0000000000000..c4596fc2a9d5b --- /dev/null +++ b/consensus/consensus-types/src/round_timeout.rs @@ -0,0 +1,177 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + common::{Author, Round}, + sync_info::SyncInfo, + timeout_2chain::TwoChainTimeout, +}; +use anyhow::{ensure, Context}; +use aptos_crypto::bls12381; +use aptos_short_hex_str::AsShortHexStr; +use aptos_types::validator_verifier::ValidatorVerifier; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)] +pub enum RoundTimeoutReason { + Unknown, + ProposalNotReceived, +} + +impl std::fmt::Display for RoundTimeoutReason { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + RoundTimeoutReason::Unknown => write!(f, "Unknown"), + RoundTimeoutReason::ProposalNotReceived => write!(f, "ProposalNotReceived"), + } + } +} + +#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)] +pub struct RoundTimeout { + // The timeout + timeout: TwoChainTimeout, + author: Author, + reason: RoundTimeoutReason, + /// Signature on the Timeout + signature: bls12381::Signature, +} + +// this is required by structured log +impl std::fmt::Debug for RoundTimeout { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self) + } +} + +impl std::fmt::Display for RoundTimeout { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "RoundTimeoutV2: [timeout: {}, author: {}, reason: {}]", + self.timeout, + self.author.short_str(), + self.reason + ) + } +} + +impl RoundTimeout { + pub fn new( + timeout: TwoChainTimeout, + author: Author, + reason: RoundTimeoutReason, + signature: bls12381::Signature, + ) -> Self { + Self { + timeout, + author, + reason, + signature, + } + } + + pub fn epoch(&self) -> u64 { + self.timeout.epoch() + } + + pub fn round(&self) -> Round { + self.timeout.round() + } + + pub fn two_chain_timeout(&self) -> &TwoChainTimeout { + &self.timeout + } + + pub fn author(&self) -> Author { + self.author + } + + pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + self.timeout.verify(validator)?; + validator + .verify( + self.author(), + &self.timeout.signing_format(), + &self.signature, + ) + .context("Failed to verify 2-chain timeout signature")?; + Ok(()) + } + + pub fn reason(&self) -> &RoundTimeoutReason { + &self.reason + } + + pub fn signature(&self) -> &bls12381::Signature { + &self.signature + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] +pub struct RoundTimeoutMsg { + /// The container for the vote (VoteData, LedgerInfo, Signature) + round_timeout: RoundTimeout, + /// Sync info carries information about highest QC, TC and LedgerInfo + sync_info: SyncInfo, +} + +impl std::fmt::Display for RoundTimeoutMsg { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "RoundTimeoutV2Msg: [{}], SyncInfo: [{}]", + self.round_timeout, self.sync_info + ) + } +} + +impl RoundTimeoutMsg { + pub fn new(round_timeout: RoundTimeout, sync_info: SyncInfo) -> Self { + Self { + round_timeout, + sync_info, + } + } + + /// SyncInfo of the given vote message + pub fn sync_info(&self) -> &SyncInfo { + &self.sync_info + } + + pub fn epoch(&self) -> u64 { + self.round_timeout.epoch() + } + + pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + ensure!( + self.round_timeout.epoch() == self.sync_info.epoch(), + "RoundTimeoutV2Msg has different epoch" + ); + ensure!( + self.round_timeout.round() > self.sync_info.highest_round(), + "Timeout Round should be higher than SyncInfo" + ); + ensure!( + self.round_timeout.two_chain_timeout().hqc_round() + <= self.sync_info.highest_certified_round(), + "2-chain Timeout hqc should be less or equal than the sync info hqc" + ); + // We're not verifying SyncInfo here yet: we are going to verify it only in case we need + // it. This way we avoid verifying O(n) SyncInfo messages while aggregating the votes + // (O(n^2) signature verifications). + self.round_timeout.verify(validator) + } + + pub fn round(&self) -> u64 { + self.round_timeout.round() + } + + pub fn author(&self) -> Author { + self.round_timeout.author() + } + + pub fn timeout(&self) -> RoundTimeout { + self.round_timeout.clone() + } +} diff --git a/consensus/consensus-types/src/vote.rs b/consensus/consensus-types/src/vote.rs index 8d5868a06c3b1..87359c31cfe70 100644 --- a/consensus/consensus-types/src/vote.rs +++ b/consensus/consensus-types/src/vote.rs @@ -140,6 +140,8 @@ impl Vote { /// Verifies that the consensus data hash of LedgerInfo corresponds to the vote info, /// and then verifies the signature. pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + // TODO(ibalajiarun): Ensure timeout is None if RoundTimeoutMsg is enabled. + ensure!( self.ledger_info.consensus_data_hash() == self.vote_data.hash(), "Vote's hash mismatch with LedgerInfo" diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index a6c43221f9a0d..4b25e780e82a0 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -1484,6 +1484,7 @@ impl EpochManager

{ ConsensusMsg::ProposalMsg(_) | ConsensusMsg::SyncInfo(_) | ConsensusMsg::VoteMsg(_) + | ConsensusMsg::RoundTimeoutMsg(_) | ConsensusMsg::OrderVoteMsg(_) | ConsensusMsg::CommitVoteMsg(_) | ConsensusMsg::CommitDecisionMsg(_) diff --git a/consensus/src/liveness/round_state.rs b/consensus/src/liveness/round_state.rs index 74e78e9c9f024..8e7602e61c0d5 100644 --- a/consensus/src/liveness/round_state.rs +++ b/consensus/src/liveness/round_state.rs @@ -8,8 +8,8 @@ use crate::{ util::time_service::{SendTask, TimeService}, }; use aptos_consensus_types::{ - common::Round, sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutWithPartialSignatures, - vote::Vote, + common::Round, round_timeout::RoundTimeout, sync_info::SyncInfo, + timeout_2chain::TwoChainTimeoutWithPartialSignatures, vote::Vote, }; use aptos_crypto::HashValue; use aptos_logger::{prelude::*, Schema}; @@ -159,6 +159,8 @@ pub struct RoundState { pending_votes: PendingVotes, // Vote sent locally for the current round. vote_sent: Option, + // Timeout sent locally for the current round. + timeout_sent: Option, // The handle to cancel previous timeout task when moving to next round. abort_handle: Option, } @@ -206,13 +208,14 @@ impl RoundState { timeout_sender, pending_votes, vote_sent: None, + timeout_sent: None, abort_handle: None, } } /// Return if already voted for timeout - pub fn is_vote_timeout(&self) -> bool { - self.vote_sent.as_ref().map_or(false, |v| v.is_timeout()) + pub fn is_timeout_sent(&self) -> bool { + self.vote_sent.as_ref().map_or(false, |v| v.is_timeout()) || self.timeout_sent.is_some() } /// Return the current round. @@ -251,6 +254,7 @@ impl RoundState { self.current_round = new_round; self.pending_votes = PendingVotes::new(); self.vote_sent = None; + self.timeout_sent = None; let timeout = self.setup_timeout(1); // The new round reason is QCReady in case both QC.round + 1 == new_round, otherwise // it's Timeout and TC.round + 1 == new_round. @@ -287,16 +291,38 @@ impl RoundState { } } + pub fn insert_round_timeout( + &mut self, + timeout: &RoundTimeout, + verifier: &ValidatorVerifier, + ) -> VoteReceptionResult { + if timeout.round() == self.current_round { + self.pending_votes.insert_round_timeout(timeout, verifier) + } else { + VoteReceptionResult::UnexpectedRound(timeout.round(), self.current_round) + } + } + pub fn record_vote(&mut self, vote: Vote) { if vote.vote_data().proposed().round() == self.current_round { self.vote_sent = Some(vote); } } + pub fn record_round_timeout(&mut self, timeout: RoundTimeout) { + if timeout.round() == self.current_round { + self.timeout_sent = Some(timeout) + } + } + pub fn vote_sent(&self) -> Option { self.vote_sent.clone() } + pub fn timeout_sent(&self) -> Option { + self.timeout_sent.clone() + } + /// Setup the timeout task and return the duration of the current timeout fn setup_timeout(&mut self, multiplier: u32) -> Duration { let timeout_sender = self.timeout_sender.clone(); diff --git a/consensus/src/logging.rs b/consensus/src/logging.rs index 27cba58a3280a..a6eb19cfc658b 100644 --- a/consensus/src/logging.rs +++ b/consensus/src/logging.rs @@ -40,6 +40,7 @@ pub enum LogEvent { ReceiveProposal, ReceiveSyncInfo, ReceiveVote, + ReceiveRoundTimeout, ReceiveOrderVote, RetrieveBlock, StateSync, diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 517c01fce472c..43a2240ca12ee 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -29,6 +29,7 @@ use aptos_consensus_types::{ pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote}, proof_of_store::{ProofOfStore, ProofOfStoreMsg, SignedBatchInfo, SignedBatchInfoMsg}, proposal_msg::ProposalMsg, + round_timeout::RoundTimeoutMsg, sync_info::SyncInfo, vote_msg::VoteMsg, }; @@ -317,6 +318,8 @@ impl NetworkSender { } pub fn broadcast_without_self(&self, msg: ConsensusMsg) { + fail_point!("consensus::send::any", |_| ()); + let self_author = self.author; let mut other_validators: Vec<_> = self .validators @@ -405,6 +408,12 @@ impl NetworkSender { self.broadcast(msg).await } + pub async fn broadcast_round_timeout(&self, round_timeout: RoundTimeoutMsg) { + fail_point!("consensus::send::round_timeout", |_| ()); + let msg = ConsensusMsg::RoundTimeoutMsg(Box::new(round_timeout)); + self.broadcast(msg).await + } + pub async fn broadcast_order_vote(&self, order_vote_msg: OrderVoteMsg) { fail_point!("consensus::send::order_vote", |_| ()); let msg = ConsensusMsg::OrderVoteMsg(Box::new(order_vote_msg)); @@ -749,6 +758,7 @@ impl NetworkTask { }, consensus_msg @ (ConsensusMsg::ProposalMsg(_) | ConsensusMsg::VoteMsg(_) + | ConsensusMsg::RoundTimeoutMsg(_) | ConsensusMsg::OrderVoteMsg(_) | ConsensusMsg::SyncInfo(_) | ConsensusMsg::EpochRetrievalRequest(_) diff --git a/consensus/src/network_interface.rs b/consensus/src/network_interface.rs index ea5b8646074f0..297b66ea7cfaf 100644 --- a/consensus/src/network_interface.rs +++ b/consensus/src/network_interface.rs @@ -18,6 +18,7 @@ use aptos_consensus_types::{ pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote}, proof_of_store::{ProofOfStoreMsg, SignedBatchInfoMsg}, proposal_msg::ProposalMsg, + round_timeout::RoundTimeoutMsg, sync_info::SyncInfo, vote_msg::VoteMsg, }; @@ -80,6 +81,8 @@ pub enum ConsensusMsg { /// OrderVoteMsg is the struct that is broadcasted by a validator on receiving quorum certificate /// on a block. OrderVoteMsg(Box), + /// RoundTimeoutMsg is broadcasted by a validator once it decides to timeout the current round. + RoundTimeoutMsg(Box), } /// Network type for consensus @@ -107,6 +110,7 @@ impl ConsensusMsg { ConsensusMsg::CommitMessage(_) => "CommitMessage", ConsensusMsg::RandGenMessage(_) => "RandGenMessage", ConsensusMsg::BatchResponseV2(_) => "BatchResponseV2", + ConsensusMsg::RoundTimeoutMsg(_) => "RoundTimeoutV2", } } } diff --git a/consensus/src/pending_votes.rs b/consensus/src/pending_votes.rs index b2177d2c5889a..a11b9d50dc976 100644 --- a/consensus/src/pending_votes.rs +++ b/consensus/src/pending_votes.rs @@ -12,9 +12,9 @@ use crate::counters; use aptos_consensus_types::{ common::Author, quorum_cert::QuorumCert, + round_timeout::RoundTimeout, timeout_2chain::{TwoChainTimeoutCertificate, TwoChainTimeoutWithPartialSignatures}, vote::Vote, - vote_data::VoteData, }; use aptos_crypto::{hash::CryptoHash, HashValue}; use aptos_logger::prelude::*; @@ -82,6 +82,83 @@ impl PendingVotes { } } + /// Insert a RoundTimeout and return a TimeoutCertificate if it can be formed + pub fn insert_round_timeout( + &mut self, + round_timeout: &RoundTimeout, + validator_verifier: &ValidatorVerifier, + ) -> VoteReceptionResult { + // + // Let's check if we can create a TC + // + + let timeout = round_timeout.two_chain_timeout(); + let signature = round_timeout.signature(); + + let validator_voting_power = validator_verifier + .get_voting_power(&round_timeout.author()) + .unwrap_or(0); + if validator_voting_power == 0 { + warn!( + "Received vote with no voting power, from {}", + round_timeout.author() + ); + } + let cur_epoch = round_timeout.epoch(); + let cur_round = round_timeout.round(); + + counters::CONSENSUS_CURRENT_ROUND_TIMEOUT_VOTED_POWER + .with_label_values(&[&round_timeout.author().to_string()]) + .set(validator_voting_power as f64); + counters::CONSENSUS_LAST_TIMEOUT_VOTE_EPOCH + .with_label_values(&[&round_timeout.author().to_string()]) + .set(cur_epoch as i64); + counters::CONSENSUS_LAST_TIMEOUT_VOTE_ROUND + .with_label_values(&[&round_timeout.author().to_string()]) + .set(cur_round as i64); + + let partial_tc = self + .maybe_partial_2chain_tc + .get_or_insert_with(|| TwoChainTimeoutWithPartialSignatures::new(timeout.clone())); + partial_tc.add(round_timeout.author(), timeout.clone(), signature.clone()); + let tc_voting_power = + match validator_verifier.check_voting_power(partial_tc.signers(), true) { + Ok(_) => { + return match partial_tc.aggregate_signatures(validator_verifier) { + Ok(tc_with_sig) => { + VoteReceptionResult::New2ChainTimeoutCertificate(Arc::new(tc_with_sig)) + }, + Err(e) => VoteReceptionResult::ErrorAggregatingTimeoutCertificate(e), + }; + }, + Err(VerifyError::TooLittleVotingPower { voting_power, .. }) => voting_power, + Err(error) => { + error!( + "MUST_FIX: 2-chain timeout vote received could not be added: {}, vote: {}", + error, timeout + ); + return VoteReceptionResult::ErrorAddingVote(error); + }, + }; + + // Echo timeout if receive f+1 timeout message. + if !self.echo_timeout { + let f_plus_one = validator_verifier.total_voting_power() + - validator_verifier.quorum_voting_power() + + 1; + if tc_voting_power >= f_plus_one { + self.echo_timeout = true; + return VoteReceptionResult::EchoTimeout(tc_voting_power); + } + } + + // + // No TC could be formed, return the TC's voting power + // + + VoteReceptionResult::VoteAdded(tc_voting_power) + } + /// Insert a vote and if the vote is valid, return a QuorumCertificate preferentially over a /// TimeoutCertificate if either can can be formed pub fn insert_vote( @@ -262,58 +339,6 @@ impl PendingVotes { VoteReceptionResult::VoteAdded(voting_power) } - pub fn aggregate_qc_now( - validator_verifier: &ValidatorVerifier, - li_with_sig: &LedgerInfoWithVerifiedSignatures, - vote_data: &VoteData, - ) -> VoteReceptionResult { - match li_with_sig.aggregate_signatures(validator_verifier) { - Ok(ledger_info_with_sig) => VoteReceptionResult::NewQuorumCertificate(Arc::new( - QuorumCert::new(vote_data.clone(), ledger_info_with_sig), - )), - Err(e) => VoteReceptionResult::ErrorAggregatingSignature(e), - } - } - - pub fn process_delayed_qc( - &mut self, - validator_verifier: &ValidatorVerifier, - vote: Vote, - ) -> VoteReceptionResult { - let li_digest = vote.ledger_info().hash(); - match self.li_digest_to_votes.get_mut(&li_digest) { - Some((_, li_with_sig)) => { - match validator_verifier.check_voting_power(li_with_sig.signatures().keys(), true) { - // a quorum of signature was reached, a new QC is formed - Ok(_) => { - Self::aggregate_qc_now(validator_verifier, li_with_sig, vote.vote_data()) - }, - - // not enough votes - Err(VerifyError::TooLittleVotingPower { .. }) => { - panic!("Delayed QC aggregation should not be triggered if we don't have enough votes to form a QC"); - }, - - // error - Err(error) => { - error!( - "MUST_FIX: vote received could not be added: {}, vote: {}", - error, vote - ); - VoteReceptionResult::ErrorAddingVote(error) - }, - } - }, - None => { - error!( - "No LedgerInfoWithSignatures found for the given digest: {}", - li_digest - ); - VoteReceptionResult::ErrorAddingVote(VerifyError::EmptySignature) - }, - } - } - pub fn drain_votes( &mut self, ) -> ( diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index f423d93d1e0ff..0cb07ce9b9f24 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -43,8 +43,9 @@ use aptos_consensus_types::{ proof_of_store::{ProofCache, ProofOfStoreMsg, SignedBatchInfoMsg}, proposal_msg::ProposalMsg, quorum_cert::QuorumCert, + round_timeout::{RoundTimeout, RoundTimeoutMsg, RoundTimeoutReason}, sync_info::SyncInfo, - timeout_2chain::TwoChainTimeoutCertificate, + timeout_2chain::{TwoChainTimeout, TwoChainTimeoutCertificate}, vote::Vote, vote_data::VoteData, vote_msg::VoteMsg, @@ -81,6 +82,7 @@ use tokio::{ pub enum UnverifiedEvent { ProposalMsg(Box), VoteMsg(Box), + RoundTimeoutMsg(Box), OrderVoteMsg(Box), SyncInfo(Box), BatchMsg(Box), @@ -122,6 +124,15 @@ impl UnverifiedEvent { } VerifiedEvent::VoteMsg(v) }, + UnverifiedEvent::RoundTimeoutMsg(v) => { + if !self_message { + v.verify(validator)?; + counters::VERIFY_MSG + .with_label_values(&["timeout"]) + .observe(start_time.elapsed().as_secs_f64()); + } + VerifiedEvent::RoundTimeoutMsg(v) + }, UnverifiedEvent::OrderVoteMsg(v) => { if !self_message { v.verify_order_vote(validator)?; @@ -177,6 +188,7 @@ impl UnverifiedEvent { UnverifiedEvent::BatchMsg(b) => b.epoch(), UnverifiedEvent::SignedBatchInfo(sd) => sd.epoch(), UnverifiedEvent::ProofOfStoreMsg(p) => p.epoch(), + UnverifiedEvent::RoundTimeoutMsg(t) => Ok(t.epoch()), } } } @@ -191,6 +203,7 @@ impl From for UnverifiedEvent { ConsensusMsg::BatchMsg(m) => UnverifiedEvent::BatchMsg(m), ConsensusMsg::SignedBatchInfo(m) => UnverifiedEvent::SignedBatchInfo(m), ConsensusMsg::ProofOfStoreMsg(m) => UnverifiedEvent::ProofOfStoreMsg(m), + ConsensusMsg::RoundTimeoutMsg(m) => UnverifiedEvent::RoundTimeoutMsg(m), _ => unreachable!("Unexpected conversion"), } } @@ -202,6 +215,7 @@ pub enum VerifiedEvent { ProposalMsg(Box), VerifiedProposalMsg(Box), VoteMsg(Box), + RoundTimeoutMsg(Box), OrderVoteMsg(Box), UnverifiedSyncInfo(Box), BatchMsg(Box), @@ -702,49 +716,91 @@ impl RoundManager { bail!("[RoundManager] sync_only flag is set, broadcasting SyncInfo"); } - let (is_nil_vote, mut timeout_vote) = match self.round_state.vote_sent() { - Some(vote) if vote.vote_data().proposed().round() == round => { - (vote.vote_data().is_for_nil(), vote) - }, - _ => { - // Didn't vote in this round yet, generate a backup vote - let nil_block = self - .proposal_generator - .generate_nil_block(round, self.proposer_election.clone())?; - info!( - self.new_log(LogEvent::VoteNIL), - "Planning to vote for a NIL block {}", nil_block + if self.local_config.enable_round_timeout_msg { + let timeout = if let Some(timeout) = self.round_state.timeout_sent() { + timeout + } else { + let timeout = TwoChainTimeout::new( + self.epoch_state().epoch, + round, + self.block_store.highest_quorum_cert().as_ref().clone(), ); - counters::VOTE_NIL_COUNT.inc(); - let nil_vote = self.vote_block(nil_block).await?; - (true, nil_vote) - }, - }; - - if !timeout_vote.is_timeout() { - let timeout = timeout_vote - .generate_2chain_timeout(self.block_store.highest_quorum_cert().as_ref().clone()); - let signature = self - .safety_rules - .lock() - .sign_timeout_with_qc( - &timeout, - self.block_store.highest_2chain_timeout_cert().as_deref(), + let signature = self + .safety_rules + .lock() + .sign_timeout_with_qc( + &timeout, + self.block_store.highest_2chain_timeout_cert().as_deref(), + ) + .context("[RoundManager] SafetyRules signs 2-chain timeout")?; + + // TODO(ibalajiarun): placeholder, update with proper reason. + let timeout_reason = RoundTimeoutReason::Unknown; + RoundTimeout::new( + timeout, + self.proposal_generator.author(), + timeout_reason, + signature, ) - .context("[RoundManager] SafetyRules signs 2-chain timeout")?; - timeout_vote.add_2chain_timeout(timeout, signature); - } + }; - self.round_state.record_vote(timeout_vote.clone()); - let timeout_vote_msg = VoteMsg::new(timeout_vote, self.block_store.sync_info()); - self.network.broadcast_timeout_vote(timeout_vote_msg).await; - warn!( - round = round, - remote_peer = self.proposer_election.get_valid_proposer(round), - voted_nil = is_nil_vote, - event = LogEvent::Timeout, - ); - bail!("Round {} timeout, broadcast to all peers", round); + self.round_state.record_round_timeout(timeout.clone()); + let round_timeout_msg = RoundTimeoutMsg::new(timeout, self.block_store.sync_info()); + self.network + .broadcast_round_timeout(round_timeout_msg) + .await; + warn!( + round = round, + remote_peer = self.proposer_election.get_valid_proposer(round), + event = LogEvent::Timeout, + ); + bail!("Round {} timeout, broadcast to all peers", round); + } else { + let (is_nil_vote, mut timeout_vote) = match self.round_state.vote_sent() { + Some(vote) if vote.vote_data().proposed().round() == round => { + (vote.vote_data().is_for_nil(), vote) + }, + _ => { + // Didn't vote in this round yet, generate a backup vote + let nil_block = self + .proposal_generator + .generate_nil_block(round, self.proposer_election.clone())?; + info!( + self.new_log(LogEvent::VoteNIL), + "Planning to vote for a NIL block {}", nil_block + ); + counters::VOTE_NIL_COUNT.inc(); + let nil_vote = self.vote_block(nil_block).await?; + (true, nil_vote) + }, + }; + + if !timeout_vote.is_timeout() { + let timeout = timeout_vote.generate_2chain_timeout( + self.block_store.highest_quorum_cert().as_ref().clone(), + ); + let signature = self + .safety_rules + .lock() + .sign_timeout_with_qc( + &timeout, + self.block_store.highest_2chain_timeout_cert().as_deref(), + ) + .context("[RoundManager] SafetyRules signs 2-chain timeout")?; + timeout_vote.add_2chain_timeout(timeout, signature); + } + + self.round_state.record_vote(timeout_vote.clone()); + let timeout_vote_msg = VoteMsg::new(timeout_vote, self.block_store.sync_info()); + self.network.broadcast_timeout_vote(timeout_vote_msg).await; + warn!( + round = round, + remote_peer = self.proposer_election.get_valid_proposer(round), + voted_nil = is_nil_vote, + event = LogEvent::Timeout, + ); + bail!("Round {} timeout, broadcast to all peers", round); + } } /// This function is called only after all the dependencies of the given QC have been retrieved. @@ -1302,7 +1358,7 @@ impl RoundManager { VoteReceptionResult::New2ChainTimeoutCertificate(tc) => { self.new_2chain_tc_aggregated(tc).await }, - VoteReceptionResult::EchoTimeout(_) if !self.round_state.is_vote_timeout() => { + VoteReceptionResult::EchoTimeout(_) if !self.round_state.is_timeout_sent() => { self.process_local_timeout(round).await }, VoteReceptionResult::VoteAdded(_) => { @@ -1314,6 +1370,70 @@ impl RoundManager { } } + async fn process_timeout_reception_result( + &mut self, + timeout: &RoundTimeout, + result: VoteReceptionResult, + ) -> anyhow::Result<()> { + let round = timeout.round(); + match result { + VoteReceptionResult::New2ChainTimeoutCertificate(tc) => { + self.new_2chain_tc_aggregated(tc).await + }, + VoteReceptionResult::EchoTimeout(_) if !self.round_state.is_timeout_sent() => { + self.process_local_timeout(round).await + }, + VoteReceptionResult::VoteAdded(_) | VoteReceptionResult::EchoTimeout(_) => Ok(()), + result @ VoteReceptionResult::NewQuorumCertificate(_) + | result @ VoteReceptionResult::DuplicateVote => { + bail!("Unexpected result from timeout processing: {:?}", result); + }, + e => Err(anyhow::anyhow!("{:?}", e)), + } + } + + pub async fn process_round_timeout_msg( + &mut self, + round_timeout_msg: RoundTimeoutMsg, + ) -> anyhow::Result<()> { + fail_point!("consensus::process_round_timeout_msg", |_| { + Err(anyhow::anyhow!( + "Injected error in process_round_timeout_msg" + )) + }); + // Check whether this validator is a valid recipient of the vote. + if self + .ensure_round_and_sync_up( + round_timeout_msg.round(), + round_timeout_msg.sync_info(), + round_timeout_msg.author(), + ) + .await + .context("[RoundManager] Stop processing vote")? + { + self.process_round_timeout(round_timeout_msg.timeout()) + .await + .context("[RoundManager] Add a new timeout")?; + } + Ok(()) + } + + async fn process_round_timeout(&mut self, timeout: RoundTimeout) -> anyhow::Result<()> { + info!( + self.new_log(LogEvent::ReceiveRoundTimeout) + .remote_peer(timeout.author()), + vote = %timeout, + epoch = timeout.epoch(), + round = timeout.round(), + ); + + let vote_reception_result = self + .round_state + .insert_round_timeout(&timeout, &self.epoch_state.verifier); + self.process_timeout_reception_result(&timeout, vote_reception_result) + .await + } + async fn process_order_vote_reception_result( &mut self, result: OrderVoteReceptionResult, @@ -1566,6 +1686,9 @@ impl RoundManager { VerifiedEvent::VoteMsg(vote_msg) => { monitor!("process_vote", self.process_vote_msg(*vote_msg).await) } + VerifiedEvent::RoundTimeoutMsg(timeout_msg) => { + monitor!("process_round_timeout", self.process_round_timeout_msg(*timeout_msg).await) + } VerifiedEvent::OrderVoteMsg(order_vote_msg) => { monitor!("process_order_vote", self.process_order_vote_msg(*order_vote_msg).await) } diff --git a/consensus/src/round_manager_test.rs b/consensus/src/round_manager_test.rs index c12e476a7f56c..8112dfa3f42de 100644 --- a/consensus/src/round_manager_test.rs +++ b/consensus/src/round_manager_test.rs @@ -42,6 +42,7 @@ use aptos_consensus_types::{ common::{Author, Payload, Round}, pipeline::commit_decision::CommitDecision, proposal_msg::ProposalMsg, + round_timeout::RoundTimeoutMsg, sync_info::SyncInfo, timeout_2chain::{TwoChainTimeout, TwoChainTimeoutWithPartialSignatures}, utils::PayloadTxnsSize, @@ -447,6 +448,17 @@ impl NodeSetup { } } + pub async fn next_timeout(&mut self) -> RoundTimeoutMsg { + match self.next_network_message().await { + ConsensusMsg::RoundTimeoutMsg(v) => *v, + msg => panic!( + "Unexpected Consensus Message: {:?} on node {}", + msg, + self.identity_desc() + ), + } + } + pub async fn next_commit_decision(&mut self) -> CommitDecision { match self.next_network_message().await { ConsensusMsg::CommitDecisionMsg(v) => *v, @@ -1488,6 +1500,46 @@ fn nil_vote_on_timeout() { }); } +#[test] +/// Generate a Timeout upon timeout if no votes have been sent in the round. +fn timeout_round_on_timeout() { + let runtime = consensus_runtime(); + let mut playground = NetworkPlayground::new(runtime.handle().clone()); + let local_config = ConsensusConfig { + enable_round_timeout_msg: true, + ..Default::default() + }; + let mut nodes = NodeSetup::create_nodes( + &mut playground, + runtime.handle().clone(), + 1, + None, + None, + Some(local_config), + None, + None, + ); + let node = &mut nodes[0]; + let genesis = node.block_store.ordered_root(); + timed_block_on(&runtime, async { + node.next_proposal().await; + // Process the outgoing vote message and verify that it contains a round signature + // and that the vote extends genesis. + node.round_manager + .process_local_timeout(1) + .await + .unwrap_err(); + let timeout_msg = node.next_timeout().await; + + let timeout = timeout_msg.timeout(); + + assert_eq!(timeout.round(), 1); + assert_eq!(timeout.author(), node.signer.author()); + assert_eq!(timeout.epoch(), 1); + assert_eq!(timeout.two_chain_timeout().hqc_round(), genesis.round()); + }); +} + #[test] /// If the node votes in a round, upon timeout the same vote is re-sent with a timeout signature. fn vote_resent_on_timeout() { @@ -1529,6 +1581,50 @@ fn vote_resent_on_timeout() { }); } +#[test] +/// If the node votes in a round, upon timeout the same vote is re-sent with a timeout signature. +fn timeout_sent_on_timeout_after_vote() { + let runtime = consensus_runtime(); + let mut playground = NetworkPlayground::new(runtime.handle().clone()); + let local_config = ConsensusConfig { + enable_round_timeout_msg: true, + ..Default::default() + }; + let mut nodes = NodeSetup::create_nodes( + &mut playground, + runtime.handle().clone(), + 1, + None, + None, + Some(local_config), + None, + None, + ); + let node = &mut nodes[0]; + timed_block_on(&runtime, async { + let proposal_msg = node.next_proposal().await; + let id = proposal_msg.proposal().id(); + node.round_manager + .process_proposal_msg(proposal_msg) + .await + .unwrap(); + let vote_msg = node.next_vote().await; + let vote = vote_msg.vote(); + assert!(!vote.is_timeout()); + assert_eq!(vote.vote_data().proposed().id(), id); + // Process the outgoing vote message and verify that it contains a round signature + // and that the vote is the same as above. + node.round_manager + .process_local_timeout(1) + .await + .unwrap_err(); + let timeout_msg = node.next_timeout().await; + + assert_eq!(timeout_msg.round(), vote.vote_data().proposed().round()); + assert_eq!(timeout_msg.sync_info(), vote_msg.sync_info()); + }); +} + #[test] #[ignore] // TODO: this test needs to be fixed! fn sync_on_partial_newer_sync_info() { diff --git a/testsuite/forge/src/backend/local/node.rs b/testsuite/forge/src/backend/local/node.rs index d0beef57a2746..e1a61dacb4ae1 100644 --- a/testsuite/forge/src/backend/local/node.rs +++ b/testsuite/forge/src/backend/local/node.rs @@ -197,7 +197,7 @@ impl LocalNode { &self.config } - pub(crate) fn config_mut(&mut self) -> &mut NodeConfig { + pub fn config_mut(&mut self) -> &mut NodeConfig { &mut self.config } diff --git a/testsuite/forge/src/interface/aptos.rs b/testsuite/forge/src/interface/aptos.rs index 6e38ae931dfed..a77bad1eeadab 100644 --- a/testsuite/forge/src/interface/aptos.rs +++ b/testsuite/forge/src/interface/aptos.rs @@ -113,6 +113,7 @@ impl<'t> AptosContext<'t> { } } +#[derive(Clone)] pub struct AptosPublicInfo { chain_id: ChainId, inspection_service_url: Url, diff --git a/testsuite/generate-format/src/consensus.rs b/testsuite/generate-format/src/consensus.rs index 50db48dcc165f..5ef14eb37c69b 100644 --- a/testsuite/generate-format/src/consensus.rs +++ b/testsuite/generate-format/src/consensus.rs @@ -115,6 +115,7 @@ pub fn get_registry() -> Result { tracer.trace_type::(&samples)?; tracer.trace_type::(&samples)?; + tracer.trace_type::(&samples)?; tracer.trace_type::(&samples)?; tracer.trace_type::(&samples)?; tracer.trace_type::(&samples)?; diff --git a/testsuite/generate-format/tests/staged/consensus.yaml b/testsuite/generate-format/tests/staged/consensus.yaml index 9548f063c237d..1e81da1a2692b 100644 --- a/testsuite/generate-format/tests/staged/consensus.yaml +++ b/testsuite/generate-format/tests/staged/consensus.yaml @@ -421,6 +421,10 @@ ConsensusMsg: OrderVoteMsg: NEWTYPE: TYPENAME: OrderVoteMsg + 19: + RoundTimeoutMsg: + NEWTYPE: + TYPENAME: RoundTimeoutMsg ContractEvent: ENUM: 0: @@ -828,6 +832,28 @@ RawTransaction: - expiration_timestamp_secs: U64 - chain_id: TYPENAME: ChainId +RoundTimeout: + STRUCT: + - timeout: + TYPENAME: TwoChainTimeout + - author: + TYPENAME: AccountAddress + - reason: + TYPENAME: RoundTimeoutReason + - signature: + TYPENAME: Signature +RoundTimeoutMsg: + STRUCT: + - round_timeout: + TYPENAME: RoundTimeout + - sync_info: + TYPENAME: SyncInfo +RoundTimeoutReason: + ENUM: + 0: + Unknown: UNIT + 1: + ProposalNotReceived: UNIT Script: STRUCT: - code: BYTES diff --git a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs index 56e6cea9fa2d5..30cb64a7ade9f 100644 --- a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs +++ b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs @@ -9,7 +9,7 @@ use aptos_forge::{ test_utils::consensus_utils::{ no_failure_injection, test_consensus_fault_tolerance, FailPointFailureInjection, NodeState, }, - LocalSwarm, Swarm, SwarmExt, + LocalSwarm, NodeExt, Swarm, SwarmExt, }; use aptos_logger::info; use rand::{self, rngs::SmallRng, Rng, SeedableRng}; @@ -550,3 +550,64 @@ async fn test_alternating_having_consensus() { ) .await; } + +#[tokio::test] +async fn test_round_timeout_msg_rollout() { + let num_validators = 3; + + let mut swarm = create_swarm(num_validators, 1).await; + + let (validator_clients, public_info) = { + ( + swarm.get_validator_clients_with_names(), + swarm.aptos_public_info(), + ) + }; + test_consensus_fault_tolerance( + validator_clients.clone(), + public_info.clone(), + 3, + 5.0, + 1, + no_failure_injection(), + Box::new( + move |_, executed_epochs, executed_rounds, executed_transactions, _, _| { + successful_criteria(executed_epochs, executed_rounds, executed_transactions); + Ok(()) + }, + ), + true, + false, + ) + .await + .unwrap(); + + for val in swarm.validators_mut() { + val.stop(); + val.config_mut().consensus.enable_round_timeout_msg = true; + val.start().unwrap(); + + val.wait_until_healthy(Instant::now().checked_add(Duration::from_secs(60)).unwrap()) + .await + .unwrap(); + + test_consensus_fault_tolerance( + validator_clients.clone(), + public_info.clone(), + 1, + 30.0, + 1, + no_failure_injection(), + Box::new( + move |_, executed_epochs, executed_rounds, executed_transactions, _, _| { + successful_criteria(executed_epochs, executed_rounds, executed_transactions); + Ok(()) + }, + ), + true, + false, + ) + .await + .unwrap(); + } +}