From fa7d892799fd4170455c9e119d560f7adf22896f Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Thu, 13 Jul 2023 21:26:50 -0700 Subject: [PATCH] attempt #2 --- consensus/src/dag/dag_fetcher.rs | 69 ++++++++++++------------ consensus/src/dag/dag_store.rs | 70 ++++++------------------ consensus/src/dag/order_rule.rs | 7 ++- consensus/src/dag/tests/fetcher_test.rs | 33 ++++++------ consensus/src/dag/types.rs | 72 ++++++++++++++++--------- 5 files changed, 120 insertions(+), 131 deletions(-) diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index 976f207dae540..d1e6435c11c11 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -15,7 +15,7 @@ use aptos_consensus_types::common::Author; use aptos_infallible::RwLock; use aptos_logger::error; use aptos_types::epoch_state::EpochState; -use std::{sync::Arc, time::Duration}; +use std::{ops::Deref, sync::Arc, time::Duration}; use thiserror::Error as ThisError; use tokio::sync::{ mpsc::{Receiver, Sender}, @@ -91,20 +91,9 @@ impl DagFetcher { local_request.notify(); continue; } - - let (lowest_round, bitmask, missing_nodes) = - if let Some((lowest_round, bitmask, missing_nodes)) = dag_reader.bitmask() { - (lowest_round, bitmask, missing_nodes) - } else { - error!("Incomplete round not found, but fetch request received"); - continue; - }; - RemoteFetchRequest::new( local_request.node().metadata().clone(), - lowest_round, - bitmask, - missing_nodes, + dag_reader.bitmask(local_request.node().round()), ) }; let network_request = DAGMessage::from(remote_request.clone()).into_network_message(); @@ -117,22 +106,21 @@ impl DagFetcher { .and_then(|response| response.verify(&remote_request, &self.epoch_state.verifier)) { let ceritified_nodes = response.certified_nodes(); - if ceritified_nodes.len() != remote_request.missing_count() { - error!( - "expected {} nodes, received {}", - remote_request.missing_count(), - ceritified_nodes.len() - ); - continue; - } // TODO: support chunk response or fallback to state sync - let mut dag_writer = self.dag.write(); - for node in ceritified_nodes { - if let Err(e) = dag_writer.add_node(node) { - error!("Failed to add node {}", e); + { + let mut dag_writer = self.dag.write(); + for node in ceritified_nodes { + if let Err(e) = dag_writer.add_node(node) { + error!("Failed to add node {}", e); + } } } - local_request.notify(); + + if self.dag.read().all_exists(local_request.node().parents()) { + local_request.notify(); + } else { + // TODO: implement retry logic + } } } } @@ -140,8 +128,10 @@ impl DagFetcher { #[derive(Debug, ThisError)] pub enum FetchHandleError { - #[error("not enough nodes to satisfy request")] - NotEnoughNodes, + #[error("target node is not present")] + TargetNotPresent, + #[error("causal parents not present")] + NotPresent, } pub struct FetchHandler { @@ -161,16 +151,25 @@ impl RpcHandler for FetchHandler { fn process(&mut self, message: Self::Request) -> anyhow::Result { let dag_reader = self.dag.read(); - let nodes = dag_reader.get_missing_nodes(message.start_round(), message.exists_bitmask()); - // If this peer cannot satisfy the request, return an error. - ensure!( - nodes.len() == message.missing_count(), - FetchHandleError::NotEnoughNodes - ); + let target_node = dag_reader + .get_node(message.target()) + .ok_or(FetchHandleError::TargetNotPresent)?; + + let certified_nodes: Vec<_> = dag_reader + .reachable( + &target_node, + Some(message.exists_bitmask().first_round()), + |_| true, + ) + .skip(1) // Skip target node + .map(|node_status| node_status.as_node().clone().deref().clone()) + .collect(); + + ensure!(certified_nodes.len() > 0, FetchHandleError::NotPresent); Ok(FetchResponse::new( message.target().epoch(), - nodes.iter().map(|n| n.as_ref().clone()).collect(), + certified_nodes, )) } } diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index 40f1ade096c3f..d06184ef461a4 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -1,9 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use super::types::{DagSnapshotBitmask, NodeMetadata}; use crate::dag::{ storage::DAGStorage, - types::{CertifiedNode, NodeCertificate, NodeMetadata}, + types::{CertifiedNode, NodeCertificate}, }; use anyhow::{anyhow, ensure}; use aptos_consensus_types::common::{Author, Round}; @@ -217,6 +218,7 @@ impl Dag { &self, from: &Arc, until: Option, + filter: impl Fn(&NodeStatus) -> bool, ) -> impl Iterator { let until = until.unwrap_or(self.lowest_round()); let mut reachable_filter = Self::reachable_filter(from.digest()); @@ -226,45 +228,10 @@ impl Dag { .flat_map(|(_, round_ref)| round_ref.iter()) .flatten() .filter(move |node_status| { - matches!(node_status, NodeStatus::Unordered(_)) - && reachable_filter(node_status.as_node()) + filter(node_status) && reachable_filter(node_status.as_node()) }) } - pub fn get_missing_nodes( - &self, - start_round: Round, - bitmask: &[Vec], - ) -> Vec> { - bitmask - .iter() - .enumerate() - .flat_map(move |(round_idx, round)| { - round - .iter() - .enumerate() - .filter_map(move |(author_idx, exists)| { - if *exists { - None - } else { - Some((start_round + (round_idx as u64), author_idx)) - } - }) - }) - .filter_map(|(round, author_idx)| { - if let Some(Some(node)) = self - .nodes_by_round - .get(&round) - .and_then(|round| round.get(author_idx)) - { - Some(node.clone()) - } else { - None - } - }) - .collect() - } - pub fn get_strong_links_for_round( &self, round: Round, @@ -296,30 +263,23 @@ impl Dag { None } - pub fn bitmask(&self) -> Option<(Round, Vec>, usize)> { + pub fn bitmask(&self, target_round: Round) -> DagSnapshotBitmask { let lowest_round = match self.lowest_incomplete_round() { - Some(lowest_round) => lowest_round, - None => return None, + Some(round) => round, + None => { + return DagSnapshotBitmask::new(self.highest_round() + 1, vec![vec![ + false; + self.author_to_index.len() + ]]); + }, }; - let mut missing_count = 0; let bitmask = self .nodes_by_round - .iter() - .skip_while(|(round, _)| **round < lowest_round) - .map(|(_, round_nodes)| { - round_nodes - .iter() - .map(|node| { - if node.is_none() { - missing_count += 1; - } - node.is_some() - }) - .collect() - }) + .range(lowest_round..target_round) + .map(|(_, round_nodes)| round_nodes.iter().map(|node| node.is_some()).collect()) .collect(); - Some((lowest_round, bitmask, missing_count)) + DagSnapshotBitmask::new(lowest_round, bitmask) } } diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index 1585c6cf7370d..6d6f0e700ee56 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use super::dag_store::NodeStatus; use crate::{ dag::{anchor_election::AnchorElection, dag_store::Dag, types::NodeMetadata, CertifiedNode}, experimental::buffer_manager::OrderedBlocks, @@ -104,7 +105,11 @@ impl OrderRule { && *metadata.author() == self.anchor_election.get_anchor(metadata.round()) }; while let Some(prev_anchor) = dag_reader - .reachable(¤t_anchor, Some(self.lowest_unordered_anchor_round)) + .reachable( + ¤t_anchor, + Some(self.lowest_unordered_anchor_round), + |node_status| matches!(node_status, NodeStatus::Unordered(_)), + ) .map(|node_status| node_status.as_node()) .find(|node| is_anchor(node.metadata())) { diff --git a/consensus/src/dag/tests/fetcher_test.rs b/consensus/src/dag/tests/fetcher_test.rs index 5244d41ba0dd2..eaad3865af4c8 100644 --- a/consensus/src/dag/tests/fetcher_test.rs +++ b/consensus/src/dag/tests/fetcher_test.rs @@ -1,11 +1,11 @@ // Copyright © Aptos Foundation -use super::{dag_test::MockStorage, helpers::new_node}; +use super::dag_test::MockStorage; use crate::dag::{ dag_fetcher::FetchHandler, dag_store::Dag, tests::helpers::new_certified_node, - types::{FetchResponse, RemoteFetchRequest}, + types::{DagSnapshotBitmask, FetchResponse, RemoteFetchRequest}, RpcHandler, }; use aptos_infallible::RwLock; @@ -34,26 +34,27 @@ fn test_dag_fetcher_receiver() { first_round_nodes.push(node); } - let target_node = new_node(2, 100, signers[0].author(), vec![]); + // Round 2 - node 0 + let target_node = new_certified_node(2, signers[0].author(), vec![ + first_round_nodes[0].certificate() + ]); - let request = - RemoteFetchRequest::new(target_node.metadata().clone(), 1, vec![vec![false; 4]], 4); + let request = RemoteFetchRequest::new( + target_node.metadata().clone(), + DagSnapshotBitmask::new(1, vec![vec![false; 4]]), + ); assert_eq!( - fetcher.process(request).unwrap_err().to_string(), - "not enough nodes to satisfy request" + fetcher.process(request.clone()).unwrap_err().to_string(), + "target node is not present" ); - // Round 1 - node 3 - { - let node = new_certified_node(1, signers[3].author(), vec![]); - assert!(dag.write().add_node(node.clone()).is_ok()); - first_round_nodes.push(node); - } + // Add Round 2 - node 0 + assert!(dag.write().add_node(target_node.clone()).is_ok()); - let request = - RemoteFetchRequest::new(target_node.metadata().clone(), 1, vec![vec![false; 4]], 4); assert_ok_eq!( fetcher.process(request), - FetchResponse::new(0, first_round_nodes) + FetchResponse::new(1, vec![first_round_nodes[0].clone()]) ); } + +// TODO: add more tests after commit rule tests diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index 46c121ac1ae2f..9a7f4194be69d 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -481,23 +481,14 @@ impl BroadcastStatus for CertificateAckState { #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RemoteFetchRequest { target: NodeMetadata, - start_round: Round, - exists_bitmask: Vec>, - missing_count: usize, + exists_bitmask: DagSnapshotBitmask, } impl RemoteFetchRequest { - pub fn new( - target: NodeMetadata, - start_round: Round, - exists_bitmask: Vec>, - missing_count: usize, - ) -> Self { + pub fn new(target: NodeMetadata, exists_bitmask: DagSnapshotBitmask) -> Self { Self { target, - start_round, exists_bitmask, - missing_count, } } @@ -505,24 +496,16 @@ impl RemoteFetchRequest { &self.target } - pub fn start_round(&self) -> Round { - self.start_round - } - - pub fn exists_bitmask(&self) -> &Vec> { + pub fn exists_bitmask(&self) -> &DagSnapshotBitmask { &self.exists_bitmask } - - pub fn missing_count(&self) -> usize { - self.missing_count - } } impl TDAGMessage for RemoteFetchRequest { fn verify(&self, _verifier: &ValidatorVerifier) -> anyhow::Result<()> { ensure!( - self.target.round >= self.start_round + self.exists_bitmask.len() as u64, - "target node round should be greater or equal to highest requested round" + self.target.round > self.exists_bitmask.last_round(), + "target node round should be strictly higher than the last bitmark round" ); Ok(()) @@ -552,9 +535,16 @@ impl FetchResponse { pub fn verify( self, _request: &RemoteFetchRequest, - _validator_verifier: &ValidatorVerifier, + validator_verifier: &ValidatorVerifier, ) -> anyhow::Result { - todo!("verification"); + ensure!( + self.certified_nodes + .iter() + .all(|node| node.verify(validator_verifier).is_ok()), + "unable to verify certified nodes" + ); + + Ok(self) } } @@ -659,3 +649,37 @@ impl TDAGMessage for TestAck { todo!() } } + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct DagSnapshotBitmask { + bitmask: Vec>, + first_round: Round, +} + +impl DagSnapshotBitmask { + pub fn new(first_round: Round, bitmask: Vec>) -> Self { + Self { + bitmask, + first_round, + } + } + + pub fn has(&self, round: Round, author_idx: usize) -> bool { + let round_idx = match round.checked_sub(self.first_round) { + Some(idx) => idx as usize, + None => return false, + }; + self.bitmask + .get(round_idx) + .and_then(|round| round.get(author_idx)) + .is_some() + } + + pub fn last_round(&self) -> Round { + self.first_round + self.bitmask.len() as Round - 1 + } + + pub fn first_round(&self) -> Round { + self.first_round + } +}