Skip to content

Commit

Permalink
attempt #2
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jul 16, 2023
1 parent 710490b commit fa7d892
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 131 deletions.
69 changes: 34 additions & 35 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_types::epoch_state::EpochState;
use std::{sync::Arc, time::Duration};
use std::{ops::Deref, sync::Arc, time::Duration};
use thiserror::Error as ThisError;
use tokio::sync::{
mpsc::{Receiver, Sender},
Expand Down Expand Up @@ -91,20 +91,9 @@ impl DagFetcher {
local_request.notify();
continue;
}

let (lowest_round, bitmask, missing_nodes) =
if let Some((lowest_round, bitmask, missing_nodes)) = dag_reader.bitmask() {
(lowest_round, bitmask, missing_nodes)
} else {
error!("Incomplete round not found, but fetch request received");
continue;
};

RemoteFetchRequest::new(
local_request.node().metadata().clone(),
lowest_round,
bitmask,
missing_nodes,
dag_reader.bitmask(local_request.node().round()),
)
};
let network_request = DAGMessage::from(remote_request.clone()).into_network_message();
Expand All @@ -117,31 +106,32 @@ impl DagFetcher {
.and_then(|response| response.verify(&remote_request, &self.epoch_state.verifier))
{
let ceritified_nodes = response.certified_nodes();
if ceritified_nodes.len() != remote_request.missing_count() {
error!(
"expected {} nodes, received {}",
remote_request.missing_count(),
ceritified_nodes.len()
);
continue;
}
// TODO: support chunk response or fallback to state sync
let mut dag_writer = self.dag.write();
for node in ceritified_nodes {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
{
let mut dag_writer = self.dag.write();
for node in ceritified_nodes {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
}
}
}
local_request.notify();

if self.dag.read().all_exists(local_request.node().parents()) {
local_request.notify();
} else {
// TODO: implement retry logic
}
}
}
}
}

#[derive(Debug, ThisError)]
pub enum FetchHandleError {
#[error("not enough nodes to satisfy request")]
NotEnoughNodes,
#[error("target node is not present")]
TargetNotPresent,
#[error("causal parents not present")]
NotPresent,
}

pub struct FetchHandler {
Expand All @@ -161,16 +151,25 @@ impl RpcHandler for FetchHandler {
fn process(&mut self, message: Self::Request) -> anyhow::Result<Self::Response> {
let dag_reader = self.dag.read();

let nodes = dag_reader.get_missing_nodes(message.start_round(), message.exists_bitmask());
// If this peer cannot satisfy the request, return an error.
ensure!(
nodes.len() == message.missing_count(),
FetchHandleError::NotEnoughNodes
);
let target_node = dag_reader
.get_node(message.target())
.ok_or(FetchHandleError::TargetNotPresent)?;

let certified_nodes: Vec<_> = dag_reader
.reachable(
&target_node,
Some(message.exists_bitmask().first_round()),
|_| true,
)
.skip(1) // Skip target node
.map(|node_status| node_status.as_node().clone().deref().clone())
.collect();

ensure!(certified_nodes.len() > 0, FetchHandleError::NotPresent);

Ok(FetchResponse::new(
message.target().epoch(),
nodes.iter().map(|n| n.as_ref().clone()).collect(),
certified_nodes,
))
}
}
70 changes: 15 additions & 55 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::types::{DagSnapshotBitmask, NodeMetadata};
use crate::dag::{
storage::DAGStorage,
types::{CertifiedNode, NodeCertificate, NodeMetadata},
types::{CertifiedNode, NodeCertificate},
};
use anyhow::{anyhow, ensure};
use aptos_consensus_types::common::{Author, Round};
Expand Down Expand Up @@ -217,6 +218,7 @@ impl Dag {
&self,
from: &Arc<CertifiedNode>,
until: Option<Round>,
filter: impl Fn(&NodeStatus) -> bool,
) -> impl Iterator<Item = &NodeStatus> {
let until = until.unwrap_or(self.lowest_round());
let mut reachable_filter = Self::reachable_filter(from.digest());
Expand All @@ -226,45 +228,10 @@ impl Dag {
.flat_map(|(_, round_ref)| round_ref.iter())
.flatten()
.filter(move |node_status| {
matches!(node_status, NodeStatus::Unordered(_))
&& reachable_filter(node_status.as_node())
filter(node_status) && reachable_filter(node_status.as_node())
})
}

pub fn get_missing_nodes(
&self,
start_round: Round,
bitmask: &[Vec<bool>],
) -> Vec<Arc<CertifiedNode>> {
bitmask
.iter()
.enumerate()
.flat_map(move |(round_idx, round)| {
round
.iter()
.enumerate()
.filter_map(move |(author_idx, exists)| {
if *exists {
None
} else {
Some((start_round + (round_idx as u64), author_idx))
}
})
})
.filter_map(|(round, author_idx)| {
if let Some(Some(node)) = self
.nodes_by_round
.get(&round)
.and_then(|round| round.get(author_idx))
{
Some(node.clone())
} else {
None
}
})
.collect()
}

pub fn get_strong_links_for_round(
&self,
round: Round,
Expand Down Expand Up @@ -296,30 +263,23 @@ impl Dag {
None
}

pub fn bitmask(&self) -> Option<(Round, Vec<Vec<bool>>, usize)> {
pub fn bitmask(&self, target_round: Round) -> DagSnapshotBitmask {
let lowest_round = match self.lowest_incomplete_round() {
Some(lowest_round) => lowest_round,
None => return None,
Some(round) => round,
None => {
return DagSnapshotBitmask::new(self.highest_round() + 1, vec![vec![
false;
self.author_to_index.len()
]]);
},
};

let mut missing_count = 0;
let bitmask = self
.nodes_by_round
.iter()
.skip_while(|(round, _)| **round < lowest_round)
.map(|(_, round_nodes)| {
round_nodes
.iter()
.map(|node| {
if node.is_none() {
missing_count += 1;
}
node.is_some()
})
.collect()
})
.range(lowest_round..target_round)
.map(|(_, round_nodes)| round_nodes.iter().map(|node| node.is_some()).collect())
.collect();

Some((lowest_round, bitmask, missing_count))
DagSnapshotBitmask::new(lowest_round, bitmask)
}
}
7 changes: 6 additions & 1 deletion consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::dag_store::NodeStatus;
use crate::{
dag::{anchor_election::AnchorElection, dag_store::Dag, types::NodeMetadata, CertifiedNode},
experimental::buffer_manager::OrderedBlocks,
Expand Down Expand Up @@ -104,7 +105,11 @@ impl OrderRule {
&& *metadata.author() == self.anchor_election.get_anchor(metadata.round())
};
while let Some(prev_anchor) = dag_reader
.reachable(&current_anchor, Some(self.lowest_unordered_anchor_round))
.reachable(
&current_anchor,
Some(self.lowest_unordered_anchor_round),
|node_status| matches!(node_status, NodeStatus::Unordered(_)),
)
.map(|node_status| node_status.as_node())
.find(|node| is_anchor(node.metadata()))
{
Expand Down
33 changes: 17 additions & 16 deletions consensus/src/dag/tests/fetcher_test.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright © Aptos Foundation

use super::{dag_test::MockStorage, helpers::new_node};
use super::dag_test::MockStorage;
use crate::dag::{
dag_fetcher::FetchHandler,
dag_store::Dag,
tests::helpers::new_certified_node,
types::{FetchResponse, RemoteFetchRequest},
types::{DagSnapshotBitmask, FetchResponse, RemoteFetchRequest},
RpcHandler,
};
use aptos_infallible::RwLock;
Expand Down Expand Up @@ -34,26 +34,27 @@ fn test_dag_fetcher_receiver() {
first_round_nodes.push(node);
}

let target_node = new_node(2, 100, signers[0].author(), vec![]);
// Round 2 - node 0
let target_node = new_certified_node(2, signers[0].author(), vec![
first_round_nodes[0].certificate()
]);

let request =
RemoteFetchRequest::new(target_node.metadata().clone(), 1, vec![vec![false; 4]], 4);
let request = RemoteFetchRequest::new(
target_node.metadata().clone(),
DagSnapshotBitmask::new(1, vec![vec![false; 4]]),
);
assert_eq!(
fetcher.process(request).unwrap_err().to_string(),
"not enough nodes to satisfy request"
fetcher.process(request.clone()).unwrap_err().to_string(),
"target node is not present"
);

// Round 1 - node 3
{
let node = new_certified_node(1, signers[3].author(), vec![]);
assert!(dag.write().add_node(node.clone()).is_ok());
first_round_nodes.push(node);
}
// Add Round 2 - node 0
assert!(dag.write().add_node(target_node.clone()).is_ok());

let request =
RemoteFetchRequest::new(target_node.metadata().clone(), 1, vec![vec![false; 4]], 4);
assert_ok_eq!(
fetcher.process(request),
FetchResponse::new(0, first_round_nodes)
FetchResponse::new(1, vec![first_round_nodes[0].clone()])
);
}

// TODO: add more tests after commit rule tests
Loading

0 comments on commit fa7d892

Please sign in to comment.