Skip to content

Commit

Permalink
[consensus] split round timeout msg out of vote msg (#14433)
Browse files Browse the repository at this point in the history
* [consensus] split round timeout msg out of vote msg
* Smoke test for round timeout msg rollout
* update goldenfiles
  • Loading branch information
ibalajiarun authored Oct 2, 2024
1 parent 6c5ef3a commit 628e88b
Show file tree
Hide file tree
Showing 17 changed files with 658 additions and 102 deletions.
3 changes: 2 additions & 1 deletion config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ pub struct ConsensusConfig {
pub rand_rb_config: ReliableBroadcastConfig,
pub num_bounded_executor_tasks: u64,
pub enable_pre_commit: bool,

pub max_pending_rounds_in_commit_vote_cache: u64,
pub enable_round_timeout_msg: bool,
}

/// Deprecated
Expand Down Expand Up @@ -320,6 +320,7 @@ impl Default for ConsensusConfig {
num_bounded_executor_tasks: 16,
enable_pre_commit: true,
max_pending_rounds_in_commit_vote_cache: 100,
enable_round_timeout_msg: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions consensus/consensus-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod proposal_msg;
pub mod quorum_cert;
pub mod randomness;
pub mod request_response;
pub mod round_timeout;
pub mod safety_data;
pub mod sync_info;
pub mod timeout_2chain;
Expand Down
177 changes: 177 additions & 0 deletions consensus/consensus-types/src/round_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
common::{Author, Round},
sync_info::SyncInfo,
timeout_2chain::TwoChainTimeout,
};
use anyhow::{ensure, Context};
use aptos_crypto::bls12381;
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::validator_verifier::ValidatorVerifier;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
pub enum RoundTimeoutReason {
Unknown,
ProposalNotReceived,
}

impl std::fmt::Display for RoundTimeoutReason {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
RoundTimeoutReason::Unknown => write!(f, "Unknown"),
RoundTimeoutReason::ProposalNotReceived => write!(f, "ProposalNotReceived"),
}
}
}

#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
pub struct RoundTimeout {
// The timeout
timeout: TwoChainTimeout,
author: Author,
reason: RoundTimeoutReason,
/// Signature on the Timeout
signature: bls12381::Signature,
}

// this is required by structured log
impl std::fmt::Debug for RoundTimeout {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self)
}
}

impl std::fmt::Display for RoundTimeout {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"RoundTimeoutV2: [timeout: {}, author: {}, reason: {}]",
self.timeout,
self.author.short_str(),
self.reason
)
}
}

impl RoundTimeout {
pub fn new(
timeout: TwoChainTimeout,
author: Author,
reason: RoundTimeoutReason,
signature: bls12381::Signature,
) -> Self {
Self {
timeout,
author,
reason,
signature,
}
}

pub fn epoch(&self) -> u64 {
self.timeout.epoch()
}

pub fn round(&self) -> Round {
self.timeout.round()
}

pub fn two_chain_timeout(&self) -> &TwoChainTimeout {
&self.timeout
}

pub fn author(&self) -> Author {
self.author
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
self.timeout.verify(validator)?;
validator
.verify(
self.author(),
&self.timeout.signing_format(),
&self.signature,
)
.context("Failed to verify 2-chain timeout signature")?;
Ok(())
}

pub fn reason(&self) -> &RoundTimeoutReason {
&self.reason
}

pub fn signature(&self) -> &bls12381::Signature {
&self.signature
}
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
pub struct RoundTimeoutMsg {
/// The container for the vote (VoteData, LedgerInfo, Signature)
round_timeout: RoundTimeout,
/// Sync info carries information about highest QC, TC and LedgerInfo
sync_info: SyncInfo,
}

impl std::fmt::Display for RoundTimeoutMsg {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"RoundTimeoutV2Msg: [{}], SyncInfo: [{}]",
self.round_timeout, self.sync_info
)
}
}

impl RoundTimeoutMsg {
pub fn new(round_timeout: RoundTimeout, sync_info: SyncInfo) -> Self {
Self {
round_timeout,
sync_info,
}
}

/// SyncInfo of the given vote message
pub fn sync_info(&self) -> &SyncInfo {
&self.sync_info
}

pub fn epoch(&self) -> u64 {
self.round_timeout.epoch()
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(
self.round_timeout.epoch() == self.sync_info.epoch(),
"RoundTimeoutV2Msg has different epoch"
);
ensure!(
self.round_timeout.round() > self.sync_info.highest_round(),
"Timeout Round should be higher than SyncInfo"
);
ensure!(
self.round_timeout.two_chain_timeout().hqc_round()
<= self.sync_info.highest_certified_round(),
"2-chain Timeout hqc should be less or equal than the sync info hqc"
);
// We're not verifying SyncInfo here yet: we are going to verify it only in case we need
// it. This way we avoid verifying O(n) SyncInfo messages while aggregating the votes
// (O(n^2) signature verifications).
self.round_timeout.verify(validator)
}

pub fn round(&self) -> u64 {
self.round_timeout.round()
}

pub fn author(&self) -> Author {
self.round_timeout.author()
}

pub fn timeout(&self) -> RoundTimeout {
self.round_timeout.clone()
}
}
2 changes: 2 additions & 0 deletions consensus/consensus-types/src/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ impl Vote {
/// Verifies that the consensus data hash of LedgerInfo corresponds to the vote info,
/// and then verifies the signature.
pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
// TODO(ibalajiarun): Ensure timeout is None if RoundTimeoutMsg is enabled.

ensure!(
self.ledger_info.consensus_data_hash() == self.vote_data.hash(),
"Vote's hash mismatch with LedgerInfo"
Expand Down
1 change: 1 addition & 0 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
ConsensusMsg::ProposalMsg(_)
| ConsensusMsg::SyncInfo(_)
| ConsensusMsg::VoteMsg(_)
| ConsensusMsg::RoundTimeoutMsg(_)
| ConsensusMsg::OrderVoteMsg(_)
| ConsensusMsg::CommitVoteMsg(_)
| ConsensusMsg::CommitDecisionMsg(_)
Expand Down
34 changes: 30 additions & 4 deletions consensus/src/liveness/round_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
util::time_service::{SendTask, TimeService},
};
use aptos_consensus_types::{
common::Round, sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutWithPartialSignatures,
vote::Vote,
common::Round, round_timeout::RoundTimeout, sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutWithPartialSignatures, vote::Vote,
};
use aptos_crypto::HashValue;
use aptos_logger::{prelude::*, Schema};
Expand Down Expand Up @@ -159,6 +159,8 @@ pub struct RoundState {
pending_votes: PendingVotes,
// Vote sent locally for the current round.
vote_sent: Option<Vote>,
// Timeout sent locally for the current round.
timeout_sent: Option<RoundTimeout>,
// The handle to cancel previous timeout task when moving to next round.
abort_handle: Option<AbortHandle>,
}
Expand Down Expand Up @@ -206,13 +208,14 @@ impl RoundState {
timeout_sender,
pending_votes,
vote_sent: None,
timeout_sent: None,
abort_handle: None,
}
}

/// Return if already voted for timeout
pub fn is_vote_timeout(&self) -> bool {
self.vote_sent.as_ref().map_or(false, |v| v.is_timeout())
pub fn is_timeout_sent(&self) -> bool {
self.vote_sent.as_ref().map_or(false, |v| v.is_timeout()) || self.timeout_sent.is_some()
}

/// Return the current round.
Expand Down Expand Up @@ -251,6 +254,7 @@ impl RoundState {
self.current_round = new_round;
self.pending_votes = PendingVotes::new();
self.vote_sent = None;
self.timeout_sent = None;
let timeout = self.setup_timeout(1);
// The new round reason is QCReady in case both QC.round + 1 == new_round, otherwise
// it's Timeout and TC.round + 1 == new_round.
Expand Down Expand Up @@ -287,16 +291,38 @@ impl RoundState {
}
}

pub fn insert_round_timeout(
&mut self,
timeout: &RoundTimeout,
verifier: &ValidatorVerifier,
) -> VoteReceptionResult {
if timeout.round() == self.current_round {
self.pending_votes.insert_round_timeout(timeout, verifier)
} else {
VoteReceptionResult::UnexpectedRound(timeout.round(), self.current_round)
}
}

pub fn record_vote(&mut self, vote: Vote) {
if vote.vote_data().proposed().round() == self.current_round {
self.vote_sent = Some(vote);
}
}

pub fn record_round_timeout(&mut self, timeout: RoundTimeout) {
if timeout.round() == self.current_round {
self.timeout_sent = Some(timeout)
}
}

pub fn vote_sent(&self) -> Option<Vote> {
self.vote_sent.clone()
}

pub fn timeout_sent(&self) -> Option<RoundTimeout> {
self.timeout_sent.clone()
}

/// Setup the timeout task and return the duration of the current timeout
fn setup_timeout(&mut self, multiplier: u32) -> Duration {
let timeout_sender = self.timeout_sender.clone();
Expand Down
1 change: 1 addition & 0 deletions consensus/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub enum LogEvent {
ReceiveProposal,
ReceiveSyncInfo,
ReceiveVote,
ReceiveRoundTimeout,
ReceiveOrderVote,
RetrieveBlock,
StateSync,
Expand Down
10 changes: 10 additions & 0 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use aptos_consensus_types::{
pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote},
proof_of_store::{ProofOfStore, ProofOfStoreMsg, SignedBatchInfo, SignedBatchInfoMsg},
proposal_msg::ProposalMsg,
round_timeout::RoundTimeoutMsg,
sync_info::SyncInfo,
vote_msg::VoteMsg,
};
Expand Down Expand Up @@ -317,6 +318,8 @@ impl NetworkSender {
}

pub fn broadcast_without_self(&self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());

let self_author = self.author;
let mut other_validators: Vec<_> = self
.validators
Expand Down Expand Up @@ -405,6 +408,12 @@ impl NetworkSender {
self.broadcast(msg).await
}

pub async fn broadcast_round_timeout(&self, round_timeout: RoundTimeoutMsg) {
fail_point!("consensus::send::round_timeout", |_| ());
let msg = ConsensusMsg::RoundTimeoutMsg(Box::new(round_timeout));
self.broadcast(msg).await
}

pub async fn broadcast_order_vote(&self, order_vote_msg: OrderVoteMsg) {
fail_point!("consensus::send::order_vote", |_| ());
let msg = ConsensusMsg::OrderVoteMsg(Box::new(order_vote_msg));
Expand Down Expand Up @@ -749,6 +758,7 @@ impl NetworkTask {
},
consensus_msg @ (ConsensusMsg::ProposalMsg(_)
| ConsensusMsg::VoteMsg(_)
| ConsensusMsg::RoundTimeoutMsg(_)
| ConsensusMsg::OrderVoteMsg(_)
| ConsensusMsg::SyncInfo(_)
| ConsensusMsg::EpochRetrievalRequest(_)
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use aptos_consensus_types::{
pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote},
proof_of_store::{ProofOfStoreMsg, SignedBatchInfoMsg},
proposal_msg::ProposalMsg,
round_timeout::RoundTimeoutMsg,
sync_info::SyncInfo,
vote_msg::VoteMsg,
};
Expand Down Expand Up @@ -80,6 +81,8 @@ pub enum ConsensusMsg {
/// OrderVoteMsg is the struct that is broadcasted by a validator on receiving quorum certificate
/// on a block.
OrderVoteMsg(Box<OrderVoteMsg>),
/// RoundTimeoutMsg is broadcasted by a validator once it decides to timeout the current round.
RoundTimeoutMsg(Box<RoundTimeoutMsg>),
}

/// Network type for consensus
Expand Down Expand Up @@ -107,6 +110,7 @@ impl ConsensusMsg {
ConsensusMsg::CommitMessage(_) => "CommitMessage",
ConsensusMsg::RandGenMessage(_) => "RandGenMessage",
ConsensusMsg::BatchResponseV2(_) => "BatchResponseV2",
ConsensusMsg::RoundTimeoutMsg(_) => "RoundTimeoutV2",
}
}
}
Expand Down
Loading

0 comments on commit 628e88b

Please sign in to comment.