Skip to content

Commit

Permalink
[Draft] Verify consensus message author matches with the sender (#15386)
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala authored and bchocho committed Jan 10, 2025
1 parent a060983 commit 4d461c8
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 35 deletions.
2 changes: 1 addition & 1 deletion config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl Default for ConsensusConfig {
num_bounded_executor_tasks: 16,
enable_pre_commit: true,
max_pending_rounds_in_commit_vote_cache: 100,
optimistic_sig_verification: false,
optimistic_sig_verification: true,
enable_round_timeout_msg: true,
enable_pipeline: false,
}
Expand Down
14 changes: 12 additions & 2 deletions consensus/consensus-types/src/order_vote_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{order_vote::OrderVote, quorum_cert::QuorumCert};
use crate::{common::Author, order_vote::OrderVote, quorum_cert::QuorumCert};
use anyhow::{ensure, Context};
use aptos_types::validator_verifier::ValidatorVerifier;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -46,7 +46,17 @@ impl OrderVoteMsg {

/// This function verifies the order_vote component in the order_vote_msg.
/// The quorum cert is verified in the round manager when the quorum certificate is used.
pub fn verify_order_vote(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
pub fn verify_order_vote(
&self,
sender: Author,
validator: &ValidatorVerifier,
) -> anyhow::Result<()> {
ensure!(
self.order_vote.author() == sender,
"Order vote author {:?} is different from the sender {:?}",
self.order_vote.author(),
sender
);
ensure!(
self.quorum_cert().certified_block() == self.order_vote().ledger_info().commit_info(),
"QuorumCert and OrderVote do not match"
Expand Down
10 changes: 8 additions & 2 deletions consensus/consensus-types/src/pipeline/commit_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::common::{Author, Round};
use anyhow::Context;
use anyhow::{ensure, Context};
use aptos_crypto::{bls12381, CryptoMaterialError};
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::{
Expand Down Expand Up @@ -101,7 +101,13 @@ impl CommitVote {

/// Verifies that the consensus data hash of LedgerInfo corresponds to the commit proposal,
/// and then verifies the signature.
pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
pub fn verify(&self, sender: Author, validator: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(
self.author() == sender,
"Commit vote author {:?} doesn't match with the sender {:?}",
self.author(),
sender
);
validator
.optimistic_verify(self.author(), &self.ledger_info, &self.signature)
.context("Failed to verify Commit Vote")
Expand Down
9 changes: 9 additions & 0 deletions consensus/consensus-types/src/proposal_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,19 @@ impl ProposalMsg {

pub fn verify(
&self,
sender: Author,
validator: &ValidatorVerifier,
proof_cache: &ProofCache,
quorum_store_enabled: bool,
) -> Result<()> {
if let Some(proposal_author) = self.proposal.author() {
ensure!(
proposal_author == sender,
"Proposal author {:?} doesn't match sender {:?}",
proposal_author,
sender
);
}
self.proposal().payload().map_or(Ok(()), |p| {
p.verify(validator, proof_cache, quorum_store_enabled)
})?;
Expand Down
10 changes: 8 additions & 2 deletions consensus/consensus-types/src/vote_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{sync_info::SyncInfo, vote::Vote};
use crate::{common::Author, sync_info::SyncInfo, vote::Vote};
use anyhow::ensure;
use aptos_crypto::HashValue;
use aptos_types::validator_verifier::ValidatorVerifier;
Expand Down Expand Up @@ -54,7 +54,13 @@ impl VoteMsg {
self.vote.vote_data().proposed().id()
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
pub fn verify(&self, sender: Author, validator: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(
self.vote().author() == sender,
"Vote author {:?} is different from the sender {:?}",
self.vote().author(),
sender,
);
ensure!(
self.vote().epoch() == self.sync_info.epoch(),
"VoteMsg has different epoch"
Expand Down
14 changes: 9 additions & 5 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,12 @@ pub struct BufferManager {
commit_proof_rb_handle: Option<DropGuard>,

// message received from the network
commit_msg_rx:
Option<aptos_channels::aptos_channel::Receiver<AccountAddress, IncomingCommitRequest>>,
commit_msg_rx: Option<
aptos_channels::aptos_channel::Receiver<
AccountAddress,
(AccountAddress, IncomingCommitRequest),
>,
>,

persisting_phase_tx: Sender<CountedRequest<PersistingRequest>>,
persisting_phase_rx: Receiver<ExecutorResult<Round>>,
Expand Down Expand Up @@ -186,7 +190,7 @@ impl BufferManager {
commit_msg_tx: Arc<NetworkSender>,
commit_msg_rx: aptos_channels::aptos_channel::Receiver<
AccountAddress,
IncomingCommitRequest,
(AccountAddress, IncomingCommitRequest),
>,
persisting_phase_tx: Sender<CountedRequest<PersistingRequest>>,
persisting_phase_rx: Receiver<ExecutorResult<Round>>,
Expand Down Expand Up @@ -948,12 +952,12 @@ impl BufferManager {
let epoch_state = self.epoch_state.clone();
let bounded_executor = self.bounded_executor.clone();
spawn_named!("buffer manager verification", async move {
while let Some(commit_msg) = commit_msg_rx.next().await {
while let Some((sender, commit_msg)) = commit_msg_rx.next().await {
let tx = verified_commit_msg_tx.clone();
let epoch_state_clone = epoch_state.clone();
bounded_executor
.spawn(async move {
match commit_msg.req.verify(&epoch_state_clone.verifier) {
match commit_msg.req.verify(sender, &epoch_state_clone.verifier) {
Ok(_) => {
let _ = tx.unbounded_send(commit_msg);
},
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/pipeline/commit_reliable_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ pub enum CommitMessage {

impl CommitMessage {
/// Verify the signatures on the message
pub fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
pub fn verify(&self, sender: Author, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
match self {
CommitMessage::Vote(vote) => {
let _timer = counters::VERIFY_MSG
.with_label_values(&["commit_vote"])
.start_timer();
vote.verify(verifier)
vote.verify(sender, verifier)
},
CommitMessage::Decision(decision) => {
let _timer = counters::VERIFY_MSG
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/decoupled_execution_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn prepare_phases_and_buffer_manager(
execution_proxy: Arc<dyn StateComputer>,
safety_rules: Arc<dyn CommitSignerProvider>,
commit_msg_tx: NetworkSender,
commit_msg_rx: Receiver<AccountAddress, IncomingCommitRequest>,
commit_msg_rx: Receiver<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
persisting_proxy: Arc<dyn StateComputer>,
block_rx: UnboundedReceiver<OrderedBlocks>,
sync_rx: UnboundedReceiver<ResetRequest>,
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/pipeline/execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ pub trait TExecutionClient: Send + Sync {

struct BufferManagerHandle {
pub execute_tx: Option<UnboundedSender<OrderedBlocks>>,
pub commit_tx: Option<aptos_channel::Sender<AccountAddress, IncomingCommitRequest>>,
pub commit_tx:
Option<aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>>,
pub reset_tx_to_buffer_manager: Option<UnboundedSender<ResetRequest>>,
pub reset_tx_to_rand_manager: Option<UnboundedSender<ResetRequest>>,
}
Expand All @@ -130,7 +131,7 @@ impl BufferManagerHandle {
pub fn init(
&mut self,
execute_tx: UnboundedSender<OrderedBlocks>,
commit_tx: aptos_channel::Sender<AccountAddress, IncomingCommitRequest>,
commit_tx: aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
reset_tx_to_buffer_manager: UnboundedSender<ResetRequest>,
reset_tx_to_rand_manager: Option<UnboundedSender<ResetRequest>>,
) {
Expand Down Expand Up @@ -218,7 +219,7 @@ impl ExecutionProxyClient {
let (reset_buffer_manager_tx, reset_buffer_manager_rx) = unbounded::<ResetRequest>();

let (commit_msg_tx, commit_msg_rx) =
aptos_channel::new::<AccountAddress, IncomingCommitRequest>(
aptos_channel::new::<AccountAddress, (AccountAddress, IncomingCommitRequest)>(
QueueStyle::FIFO,
100,
Some(&counters::BUFFER_MANAGER_MSGS),
Expand Down Expand Up @@ -402,7 +403,7 @@ impl TExecutionClient for ExecutionProxyClient {
commit_msg: IncomingCommitRequest,
) -> Result<()> {
if let Some(tx) = &self.handle.read().commit_tx {
tx.push(peer_id, commit_msg)
tx.push(peer_id, (peer_id, commit_msg))
} else {
counters::EPOCH_MANAGER_ISSUES_DETAILS
.with_label_values(&["buffer_manager_not_started"])
Expand Down
19 changes: 9 additions & 10 deletions consensus/src/pipeline/tests/buffer_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn prepare_buffer_manager(
BufferManager,
Sender<OrderedBlocks>,
Sender<ResetRequest>,
aptos_channel::Sender<AccountAddress, IncomingCommitRequest>,
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
aptos_channels::UnboundedReceiver<Event<ConsensusMsg>>,
PipelinePhase<ExecutionSchedulePhase>,
PipelinePhase<ExecutionWaitPhase>,
Expand Down Expand Up @@ -122,11 +122,10 @@ pub fn prepare_buffer_manager(
validators.clone(),
);

let (msg_tx, msg_rx) = aptos_channel::new::<AccountAddress, IncomingCommitRequest>(
QueueStyle::FIFO,
channel_size,
None,
);
let (msg_tx, msg_rx) = aptos_channel::new::<
AccountAddress,
(AccountAddress, IncomingCommitRequest),
>(QueueStyle::FIFO, channel_size, None);

let (result_tx, result_rx) = create_channel::<OrderedBlocks>();
let state_computer = Arc::new(EmptyStateComputer::new(result_tx));
Expand Down Expand Up @@ -185,7 +184,7 @@ pub fn prepare_buffer_manager(
pub fn launch_buffer_manager() -> (
Sender<OrderedBlocks>,
Sender<ResetRequest>,
aptos_channel::Sender<AccountAddress, IncomingCommitRequest>,
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
aptos_channels::UnboundedReceiver<Event<ConsensusMsg>>,
HashValue,
Runtime,
Expand Down Expand Up @@ -233,20 +232,20 @@ pub fn launch_buffer_manager() -> (

async fn loopback_commit_vote(
msg: Event<ConsensusMsg>,
msg_tx: &aptos_channel::Sender<AccountAddress, IncomingCommitRequest>,
msg_tx: &aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingCommitRequest)>,
verifier: &ValidatorVerifier,
) {
match msg {
Event::RpcRequest(author, msg, protocol, callback) => {
if let ConsensusMsg::CommitMessage(msg) = msg {
msg.verify(verifier).unwrap();
msg.verify(author, verifier).unwrap();
let request = IncomingCommitRequest {
req: *msg,
protocol,
response_sender: callback,
};
// verify the message and send the message into self loop
msg_tx.push(author, request).ok();
msg_tx.push(author, (author, request)).ok();
}
},
_ => {
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/quorum_store/network_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ impl NetworkListener {
counters::QUORUM_STORE_MSG_COUNT
.with_label_values(&["NetworkListener::batchmsg"])
.inc();
let author = batch_msg.author();
// Batch msg verify function alreay ensures that the batch_msg is not empty.
let author = batch_msg.author().expect("Empty batch message");
let batches = batch_msg.take();
counters::RECEIVED_BATCH_MSG_COUNT.inc();

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/quorum_store/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ impl BatchMsg {
Ok(epoch)
}

pub fn author(&self) -> PeerId {
self.batches[0].author()
pub fn author(&self) -> Option<PeerId> {
self.batches.first().map(|batch| batch.author())
}

pub fn take(self) -> Vec<Batch> {
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl UnverifiedEvent {
//TODO: no need to sign and verify the proposal
UnverifiedEvent::ProposalMsg(p) => {
if !self_message {
p.verify(validator, proof_cache, quorum_store_enabled)?;
p.verify(peer_id, validator, proof_cache, quorum_store_enabled)?;
counters::VERIFY_MSG
.with_label_values(&["proposal"])
.observe(start_time.elapsed().as_secs_f64());
Expand All @@ -121,7 +121,7 @@ impl UnverifiedEvent {
},
UnverifiedEvent::VoteMsg(v) => {
if !self_message {
v.verify(validator)?;
v.verify(peer_id, validator)?;
counters::VERIFY_MSG
.with_label_values(&["vote"])
.observe(start_time.elapsed().as_secs_f64());
Expand All @@ -139,7 +139,7 @@ impl UnverifiedEvent {
},
UnverifiedEvent::OrderVoteMsg(v) => {
if !self_message {
v.verify_order_vote(validator)?;
v.verify_order_vote(peer_id, validator)?;
counters::VERIFY_MSG
.with_label_values(&["order_vote"])
.observe(start_time.elapsed().as_secs_f64());
Expand Down

0 comments on commit 4d461c8

Please sign in to comment.