Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions display/src/pages/overview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

use crate::{content_style, header_style};

use snarkos_node::{Node, router::Peer};
use snarkos_node::{
Node,
router::{Peer, PeerPoolHandling},
};
use snarkvm::prelude::Network;

use ratatui::{
Expand Down Expand Up @@ -79,7 +82,7 @@ impl Overview {
Peer::Connected(_) => "connected",
}.to_string();

let node_type = if let Some(node_type ) = peer.node_type() {
let node_type = if let Some(node_type) = peer.node_type() {
node_type.to_string()
} else {
"unknown".to_string()
Expand Down
3 changes: 3 additions & 0 deletions node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ features = [ "ledger", "prover" ]
workspace = true
features = [ "memory" ]

[dependencies.snarkos-node-router]
workspace = true

[dependencies.snarkos-node-sync]
workspace = true

Expand Down
243 changes: 118 additions & 125 deletions node/bft/src/gateway.rs

Large diffs are not rendered by default.

42 changes: 19 additions & 23 deletions node/bft/src/helpers/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@

use snarkvm::prelude::{Address, Network};

#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use std::{collections::HashMap, net::SocketAddr};

#[derive(Debug)]
pub struct Resolver<N: Network> {
/// The map of the listener address to (ambiguous) peer address.
from_listener: RwLock<HashMap<SocketAddr, SocketAddr>>,
from_listener: HashMap<SocketAddr, SocketAddr>,
/// The map of the (ambiguous) peer address to listener address.
to_listener: RwLock<HashMap<SocketAddr, SocketAddr>>,
to_listener: HashMap<SocketAddr, SocketAddr>,
/// A map of `peer IP` to `address`.
peer_addresses: RwLock<HashMap<SocketAddr, Address<N>>>,
peer_addresses: HashMap<SocketAddr, Address<N>>,
/// A map of `address` to `peer IP`.
address_peers: RwLock<HashMap<Address<N>, SocketAddr>>,
address_peers: HashMap<Address<N>, SocketAddr>,
}

impl<N: Network> Default for Resolver<N> {
Expand All @@ -55,41 +51,41 @@ impl<N: Network> Resolver<N> {
impl<N: Network> Resolver<N> {
/// Returns the listener address for the given (ambiguous) peer address, if it exists.
pub fn get_listener(&self, peer_addr: SocketAddr) -> Option<SocketAddr> {
self.to_listener.read().get(&peer_addr).copied()
self.to_listener.get(&peer_addr).copied()
}

/// Returns the (ambiguous) peer address for the given listener address, if it exists.
pub fn get_ambiguous(&self, peer_ip: SocketAddr) -> Option<SocketAddr> {
self.from_listener.read().get(&peer_ip).copied()
self.from_listener.get(&peer_ip).copied()
}

/// Returns the address for the given peer IP.
pub fn get_address(&self, peer_ip: SocketAddr) -> Option<Address<N>> {
self.peer_addresses.read().get(&peer_ip).copied()
self.peer_addresses.get(&peer_ip).copied()
}

/// Returns the peer IP for the given address.
pub fn get_peer_ip_for_address(&self, address: Address<N>) -> Option<SocketAddr> {
self.address_peers.read().get(&address).copied()
self.address_peers.get(&address).copied()
}

/// Inserts a bidirectional mapping of the listener address and the (ambiguous) peer address,
/// alongside a bidirectional mapping of the listener address and the Aleo address.
pub fn insert_peer(&self, listener_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
self.from_listener.write().insert(listener_ip, peer_addr);
self.to_listener.write().insert(peer_addr, listener_ip);
self.peer_addresses.write().insert(listener_ip, address);
self.address_peers.write().insert(address, listener_ip);
pub fn insert_peer(&mut self, listener_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
self.from_listener.insert(listener_ip, peer_addr);
self.to_listener.insert(peer_addr, listener_ip);
self.peer_addresses.insert(listener_ip, address);
self.address_peers.insert(address, listener_ip);
}

/// Removes the bidirectional mapping of the listener address and the (ambiguous) peer address,
/// alongside the bidirectional mapping of the listener address and the Aleo address.
pub fn remove_peer(&self, listener_ip: SocketAddr) {
if let Some(peer_addr) = self.from_listener.write().remove(&listener_ip) {
self.to_listener.write().remove(&peer_addr);
pub fn remove_peer(&mut self, listener_ip: SocketAddr) {
if let Some(peer_addr) = self.from_listener.remove(&listener_ip) {
self.to_listener.remove(&peer_addr);
}
if let Some(address) = self.peer_addresses.write().remove(&listener_ip) {
self.address_peers.write().remove(&address);
if let Some(address) = self.peer_addresses.remove(&listener_ip) {
self.address_peers.remove(&address);
}
}
}
Expand All @@ -103,7 +99,7 @@ mod tests {

#[test]
fn test_resolver() {
let resolver = Resolver::<CurrentNetwork>::new();
let mut resolver = Resolver::<CurrentNetwork>::new();
let listener_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let peer_addr = SocketAddr::from(([127, 0, 0, 1], 4321));
let mut rng = TestRng::default();
Expand Down
26 changes: 13 additions & 13 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl<N: Network> Primary<N> {
// Iterate through the non-signers.
for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
// Resolve the address to the peer IP.
match self.gateway.resolver().get_peer_ip_for_address(address) {
match self.gateway.resolver().read().get_peer_ip_for_address(address) {
// Resend the batch proposal to the validator for signing.
Some(peer_ip) => {
let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
Expand Down Expand Up @@ -730,7 +730,7 @@ impl<N: Network> Primary<N> {
let batch_author = batch_header.author();

// Ensure the batch proposal is from the validator.
match self.gateway.resolver().get_address(peer_ip) {
match self.gateway.resolver().read().get_address(peer_ip) {
// If the peer is a validator, then ensure the batch proposal is from the validator.
Some(address) => {
if address != batch_author {
Expand Down Expand Up @@ -1004,7 +1004,7 @@ impl<N: Network> Primary<N> {
let signer = signature.to_address();

// Ensure the batch signature is signed by the validator.
if self.gateway.resolver().get_address(peer_ip) != Some(signer) {
if self.gateway.resolver().read().get_address(peer_ip) != Some(signer) {
// Proceed to disconnect the validator.
self.gateway.disconnect(peer_ip);
bail!("Malicious peer - batch signature is from a different validator ({signer})");
Expand Down Expand Up @@ -1043,7 +1043,7 @@ impl<N: Network> Primary<N> {
// Retrieve the committee lookback for the round.
let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
// Retrieve the address of the validator.
let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
let Some(signer) = self_.gateway.resolver().read().get_address(peer_ip) else {
bail!("Signature is from a disconnected validator");
};
// Add the signature to the batch.
Expand Down Expand Up @@ -2229,7 +2229,7 @@ mod tests {
fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
// First account is primary, which doesn't need to resolve.
for (addr, acct) in accounts.iter().skip(1) {
primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
primary.gateway.resolver().write().insert_peer(*addr, *addr, acct.address());
}
}

Expand Down Expand Up @@ -2426,7 +2426,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// The primary will only consider itself synced if we received
// block locators from a peer.
Expand Down Expand Up @@ -2465,7 +2465,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// Add a high block locator to indicate we are not synced.
primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(20)).unwrap();
Expand Down Expand Up @@ -2505,7 +2505,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// The primary will only consider itself synced if we received
// block locators from a peer.
Expand Down Expand Up @@ -2542,7 +2542,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.try_block_sync().await;

Expand Down Expand Up @@ -2587,7 +2587,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.try_block_sync().await;

Expand Down Expand Up @@ -2643,7 +2643,7 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.try_block_sync().await;

Expand Down Expand Up @@ -2690,8 +2690,8 @@ mod tests {
}

// The author must be known to resolver to pass propose checks.
primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());
primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// primary v4 must be considered synced.
primary_v4.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
Expand Down
1 change: 1 addition & 0 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
};
use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_router::PeerPoolHandling;
use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators};
use snarkvm::{
console::{network::Network, types::Field},
Expand Down
7 changes: 4 additions & 3 deletions node/bft/tests/common/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use snarkos_node_bft::{
helpers::{PrimarySender, Storage, init_primary_channels},
};
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkos_node_router::PeerPoolHandling;
use snarkos_node_sync::BlockSync;
use snarkvm::{
console::{
Expand Down Expand Up @@ -120,7 +121,7 @@ impl TestValidator {
let self_clone = self.clone();
self.handles.lock().push(tokio::task::spawn(async move {
loop {
let connections = self_clone.primary.gateway().connected_peers().read().clone();
let connections = self_clone.primary.gateway().connected_peers();
info!("{} connections", connections.len());
for connection in connections {
debug!(" {}", connection);
Expand Down Expand Up @@ -285,7 +286,7 @@ impl TestNetwork {
// Disconnects N nodes from all other nodes.
pub async fn disconnect(&self, num_nodes: u16) {
for validator in self.validators.values().take(num_nodes as usize) {
for peer_ip in validator.primary.gateway().connected_peers().read().iter() {
for peer_ip in validator.primary.gateway().connected_peers().iter() {
validator.primary.gateway().disconnect(*peer_ip);
}
}
Expand All @@ -297,7 +298,7 @@ impl TestNetwork {
// Disconnects a specific node from all other nodes.
pub async fn disconnect_one(&self, id: u16) {
let target_validator = self.validators.get(&id).unwrap();
for peer_ip in target_validator.primary.gateway().connected_peers().read().iter() {
for peer_ip in target_validator.primary.gateway().connected_peers().iter() {
target_validator.primary.gateway().disconnect(*peer_ip);
}

Expand Down
78 changes: 5 additions & 73 deletions node/bft/tests/gateway_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use crate::common::{
};
use snarkos_account::Account;
use snarkos_node_bft::{Gateway, helpers::init_primary_channels};
use snarkos_node_bft_events::{ChallengeRequest, ChallengeResponse, Disconnect, DisconnectReason, Event, WorkerPing};
use snarkos_node_bft_events::{ChallengeRequest, ChallengeResponse, Event};
use snarkos_node_router::PeerPoolHandling;
use snarkos_node_tcp::P2P;
use snarkvm::{ledger::narwhal::Data, prelude::TestRng};

Expand Down Expand Up @@ -78,79 +79,10 @@ async fn handshake_responder_side_timeout() {
deadline!(Duration::from_secs(5), move || gateway_clone.tcp().num_connecting() == 0);

// Check the test peer hasn't been added to the gateway's connected peers.
assert!(gateway.connected_peers().read().is_empty());
assert!(gateway.connected_peers().is_empty());
assert_eq!(gateway.tcp().num_connected(), 0);
}

// The test peer connects to the gateway and sends an unexpected event.
// The gateway's handshake should be interrupted and the peer should be
// disconnected.
macro_rules! handshake_responder_side_unexpected_event {
($test_name:ident, $payload:expr) => {
paste::paste! {
#[tokio::test(flavor = "multi_thread")]
async fn [<handshake_responder_side_unexpected_ $test_name>]() {
const NUM_NODES: u16 = 4;

let mut rng = TestRng::default();
let (_accounts, gateway) = new_test_gateway(NUM_NODES, &mut rng).await;
let test_peer = TestPeer::new().await;

// Initiate a connection with the gateway, this will only return once the handshake protocol has
// completed on the test peer's side, which is a no-op.
assert!(test_peer.connect(gateway.local_ip()).await.is_ok());

// Check the connection has been registered.
let gateway_clone = gateway.clone();
deadline!(Duration::from_secs(1), move || gateway_clone.tcp().num_connecting() == 1);

// Send an unexpected event.
let _ = test_peer.unicast(
gateway.local_ip(),
$payload
);

// Check the tcp stack's connection counts, make sure the disconnect interrupted handshaking,
// wait a short time to ensure the gateway has time to process the disconnect (note: this is
// shorter than the gateway's timeout, so we can ensure that's not the reason for the
// disconnect).
let gateway_clone = gateway.clone();
deadline!(Duration::from_secs(1), move || gateway_clone.tcp().num_connecting() == 0);

// Check the test peer hasn't been added to the gateway's connected peers.
assert!(gateway.connected_peers().read().is_empty());
assert_eq!(gateway.tcp().num_connected(), 0);
}
}
};
}

/* Unexpected disconnects. */

macro_rules! handshake_responder_side_unexpected_disconnect {
($($reason:ident),*) => {
$(
paste::paste! {
handshake_responder_side_unexpected_event!(
[<disconnect_ $reason:snake>],
Event::Disconnect(Disconnect::from(DisconnectReason::$reason))
);
}
)*
}
}

handshake_responder_side_unexpected_disconnect!(
ProtocolViolation,
NoReasonGiven,
InvalidChallengeResponse,
OutdatedClientVersion
);

/* Other unexpected event types */

handshake_responder_side_unexpected_event!(worker_ping, Event::WorkerPing(WorkerPing::new([].into())));

// TODO(nkls): other event types, can be done as a follow up.

/* Invalid challenge request */
Expand Down Expand Up @@ -188,7 +120,7 @@ async fn handshake_responder_side_invalid_challenge_request() {
let gateway_clone = gateway.clone();
deadline!(Duration::from_secs(1), move || gateway_clone.tcp().num_connecting() == 0);
// Check the test peer hasn't been added to the gateway's connected peers.
assert!(gateway.connected_peers().read().is_empty());
assert!(gateway.connected_peers().is_empty());
assert_eq!(gateway.tcp().num_connected(), 0);
}

Expand Down Expand Up @@ -268,6 +200,6 @@ async fn handshake_responder_side_invalid_challenge_response() {
let gateway_clone = gateway.clone();
deadline!(Duration::from_secs(1), move || gateway_clone.tcp().num_connecting() == 0);
// Check the test peer hasn't been added to the gateway's connected peers.
assert!(gateway.connected_peers().read().is_empty());
assert!(gateway.connected_peers().is_empty());
assert_eq!(gateway.tcp().num_connected(), 0);
}
Loading