diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 07bb71c084..e0a9b41166 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -24,6 +24,7 @@ use crate::{ helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker}, spawn_blocking, }; +use aleo_std::StorageMode; use snarkos_account::Account; use snarkos_node_bft_events::{ BlockRequest, @@ -99,8 +100,6 @@ const MAX_CONNECTION_ATTEMPTS: usize = 10; /// The maximum interval to restrict a peer. const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds -/// The minimum number of validators to maintain a connection to. -const MIN_CONNECTED_VALIDATORS: usize = 175; /// The maximum number of validators to send in a validators response event. const MAX_VALIDATORS_TO_SEND: usize = 200; @@ -110,6 +109,9 @@ const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10; /// The amount of time an IP address is prohibited from connecting. const IP_BAN_TIME_IN_SECS: u64 = 300; +/// The name of the file containing cached validators. +const VALIDATOR_CACHE_FILENAME: &str = "cached_gateway_peers"; + /// Part of the Gateway API that deals with networking. /// This is a separate trait to allow for easier testing/mocking. #[async_trait] @@ -121,7 +123,17 @@ pub trait Transport: Send + Sync { /// The gateway maintains connections to other validators. /// For connections with clients and provers, the Router logic is used. #[derive(Clone)] -pub struct Gateway { +pub struct Gateway(Arc>); + +impl Deref for Gateway { + type Target = Arc>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub struct InnerGateway { /// The account of the node. account: Account, /// The storage. @@ -131,21 +143,23 @@ pub struct Gateway { /// The TCP stack. tcp: Tcp, /// The cache. - cache: Arc>, + cache: Cache, /// The resolver. - resolver: Arc>>, + resolver: RwLock>, /// The collection of both candidate and connected peers. - peer_pool: Arc>>>, + peer_pool: RwLock>>, #[cfg(feature = "telemetry")] validator_telemetry: Telemetry, /// The primary sender. - primary_sender: Arc>>, + primary_sender: OnceCell>, /// The worker senders. - worker_senders: Arc>>>, + worker_senders: OnceCell>>, /// The sync sender. - sync_sender: Arc>>, + sync_sender: OnceCell>, /// The spawned handles. - handles: Arc>>>, + handles: Mutex>>, + /// The storage mode. + storage_mode: StorageMode, /// The development mode. dev: Option, } @@ -164,6 +178,7 @@ impl Gateway { ledger: Arc>, ip: Option, trusted_validators: &[SocketAddr], + storage_mode: StorageMode, dev: Option, ) -> Result { // Initialize the gateway IP. @@ -175,30 +190,37 @@ impl Gateway { // Initialize the TCP stack. let tcp = Tcp::new(Config::new(ip, Committee::::max_committee_size()?)); - // Add the trusted validators to the peer pool. - let initial_peers = trusted_validators - .iter() - .copied() - .map(|addr| (addr, Peer::new_candidate(addr, true))) - .collect::>(); + // Prepare the collection of the initial peers. + let mut initial_peers = HashMap::new(); + + // Load entries from the validator cache (if present). + let cached_peers = Self::load_cached_peers(&storage_mode, VALIDATOR_CACHE_FILENAME)?; + for addr in cached_peers { + initial_peers.insert(addr, Peer::new_candidate(addr, false)); + } + + // Add the trusted peers to the list of the initial peers; this may promote + // some of the cached validators to trusted ones. + initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true)))); // Return the gateway. - Ok(Self { + Ok(Self(Arc::new(InnerGateway { account, storage, ledger, tcp, cache: Default::default(), resolver: Default::default(), - peer_pool: Arc::new(RwLock::new(initial_peers)), + peer_pool: RwLock::new(initial_peers), #[cfg(feature = "telemetry")] validator_telemetry: Default::default(), primary_sender: Default::default(), worker_senders: Default::default(), sync_sender: Default::default(), handles: Default::default(), + storage_mode, dev, - }) + }))) } /// Run the gateway. @@ -304,12 +326,12 @@ impl CommunicationService for Gateway { impl Gateway { /// Returns the account of the node. - pub const fn account(&self) -> &Account { + pub fn account(&self) -> &Account { &self.account } /// Returns the dev identifier of the node. - pub const fn dev(&self) -> Option { + pub fn dev(&self) -> Option { self.dev } @@ -824,7 +846,7 @@ impl Gateway { self.cache.decrement_outbound_validators_requests(peer_ip); // If the number of connected validators is less than the minimum, connect to more validators. - if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS { + if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize { // Attempt to connect to any validators that are not already connected. let self_ = self.clone(); tokio::spawn(async move { @@ -929,6 +951,10 @@ impl Gateway { /// Shuts down the gateway. pub async fn shut_down(&self) { info!("Shutting down the gateway..."); + // Save the best peers for future use. + if let Err(e) = self.save_best_peers(&self.storage_mode, VALIDATOR_CACHE_FILENAME, None) { + warn!("Failed to persist best validators to disk: {e}"); + } // Abort the tasks. self.handles.lock().iter().for_each(|handle| handle.abort()); // Close the listener. @@ -1045,9 +1071,15 @@ impl Gateway { /// This function sends a `ValidatorsRequest` to a random validator, /// if the number of connected validators is less than the minimum. + /// It also attempts to connect to known unconnected validators. fn handle_min_connected_validators(&self) { // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`. - if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS { + if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize { + for candidate_addr in self.candidate_peers() { + // Attempt to connect to unconnected validators. + self.connect(candidate_addr); + } + // Retrieve the connected validators. let validators = self.connected_peers(); // If there are no validator IPs to connect to, return early. @@ -1564,6 +1596,7 @@ mod prop_tests { gateway::prop_tests::GatewayAddress::{Dev, Prod}, helpers::{Storage, init_primary_channels, init_worker_channels}, }; + use aleo_std::StorageMode; use snarkos_account::Account; use snarkos_node_bft_ledger_service::MockLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; @@ -1637,6 +1670,7 @@ mod prop_tests { storage.ledger().clone(), address.ip(), &[], + StorageMode::new_test(None), address.port(), ) .unwrap() @@ -1682,9 +1716,16 @@ mod prop_tests { let (storage, _, private_key, dev) = input; let account = Account::try_from(private_key).unwrap(); - let gateway = - Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()) - .unwrap(); + let gateway = Gateway::new( + account.clone(), + storage.clone(), + storage.ledger().clone(), + dev.ip(), + &[], + StorageMode::new_test(None), + dev.port(), + ) + .unwrap(); let tcp_config = gateway.tcp().config(); assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST))); assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap())); @@ -1699,9 +1740,16 @@ mod prop_tests { let (storage, _, private_key, dev) = input; let account = Account::try_from(private_key).unwrap(); - let gateway = - Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()) - .unwrap(); + let gateway = Gateway::new( + account.clone(), + storage.clone(), + storage.ledger().clone(), + dev.ip(), + &[], + StorageMode::new_test(None), + dev.port(), + ) + .unwrap(); let tcp_config = gateway.tcp().config(); if let Some(socket_addr) = dev.ip() { assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip())); @@ -1726,8 +1774,16 @@ mod prop_tests { let worker_storage = storage.clone(); let account = Account::try_from(private_key).unwrap(); - let gateway = - Gateway::new(account, storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap(); + let gateway = Gateway::new( + account, + storage.clone(), + storage.ledger().clone(), + dev.ip(), + &[], + StorageMode::new_test(None), + dev.port(), + ) + .unwrap(); let (primary_sender, _) = init_primary_channels(); @@ -1791,8 +1847,16 @@ mod prop_tests { // Initialize the storage. let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); // Initialize the gateway. - let gateway = - Gateway::new(account.clone(), storage.clone(), ledger.clone(), dev.ip(), &[], dev.port()).unwrap(); + let gateway = Gateway::new( + account.clone(), + storage.clone(), + ledger.clone(), + dev.ip(), + &[], + StorageMode::new_test(None), + dev.port(), + ) + .unwrap(); // Insert certificate to the storage. for certificate in certificates.iter() { storage.testing_only_insert_certificate_testing_only(certificate.clone()); diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index b5c688b810..045bb4b4cc 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -132,7 +132,8 @@ impl Primary { dev: Option, ) -> Result { // Initialize the gateway. - let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?; + let gateway = + Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, storage_mode.clone(), dev)?; // Initialize the sync module. let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync); @@ -2008,7 +2009,7 @@ mod tests { let account = accounts[account_index].1.clone(); let block_sync = Arc::new(BlockSync::new(ledger.clone())); let mut primary = - Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::Test(None), None).unwrap(); + Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::new_test(None), None).unwrap(); // Construct a worker instance. primary.workers = Arc::from([Worker::new( diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index ae492468de..894b266d78 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -1173,12 +1173,14 @@ mod tests { core_ledger.advance_to_next_block(&block_3)?; // Initialize the syncing ledger. + let storage_mode = StorageMode::new_test(None); let syncing_ledger = Arc::new(CoreLedgerService::new( - CurrentLedger::load(genesis, StorageMode::new_test(None)).unwrap(), + CurrentLedger::load(genesis, storage_mode.clone()).unwrap(), Default::default(), )); // Initialize the gateway. - let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?; + let gateway = + Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], storage_mode, None)?; // Initialize the block synchronization logic. let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone())); // Initialize the sync module. diff --git a/node/bft/tests/common/utils.rs b/node/bft/tests/common/utils.rs index 8d18b20592..a269a41787 100644 --- a/node/bft/tests/common/utils.rs +++ b/node/bft/tests/common/utils.rs @@ -14,6 +14,7 @@ // limitations under the License. use crate::common::{CurrentNetwork, TranslucentLedgerService, primary}; +use aleo_std::StorageMode; use snarkos_account::Account; use snarkos_node_bft::{ Gateway, @@ -216,7 +217,7 @@ pub fn sample_gateway( ledger: Arc>>, ) -> Gateway { // Initialize the gateway. - Gateway::new(account, storage, ledger, None, &[], None).unwrap() + Gateway::new(account, storage, ledger, None, &[], StorageMode::new_test(None), None).unwrap() } /// Samples a new worker with the given ledger. diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 79d52e899c..9d5fc895d2 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -208,9 +208,39 @@ pub trait PeerPoolHandling: P2P { .collect() } + /// Loads any previously cached peer addresses so they can be introduced as initial + /// candidate peers to connect to. + fn load_cached_peers(storage_mode: &StorageMode, filename: &str) -> Result> { + let mut peer_cache_path = aleo_ledger_dir(N::ID, storage_mode); + peer_cache_path.push(filename); + + let peers = match fs::read_to_string(&peer_cache_path) { + Ok(cached_peers_str) => { + let mut cached_peers = Vec::new(); + for peer_addr_str in cached_peers_str.lines() { + match SocketAddr::from_str(peer_addr_str) { + Ok(addr) => cached_peers.push(addr), + Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"), + } + } + cached_peers + } + Err(error) if error.kind() == io::ErrorKind::NotFound => { + // Not an issue - the cache may not exist yet. + Vec::new() + } + Err(error) => { + warn!("Couldn't load cached peers at {}: {error}", peer_cache_path.display()); + Vec::new() + } + }; + + Ok(peers) + } + /// Preserve the peers who have the greatest known block heights, and the lowest /// number of registered network failures. - fn save_best_peers(&self, storage_mode: &StorageMode) -> Result<()> { + fn save_best_peers(&self, storage_mode: &StorageMode, filename: &str, max_entries: Option) -> Result<()> { // Collect all prospect peers. let mut peers = self.get_peers(); @@ -227,11 +257,13 @@ pub trait PeerPoolHandling: P2P { (cmp::Reverse(peer.last_height_seen()), 0) } }); - peers.truncate(MAX_PEERS_TO_SEND); + if let Some(max) = max_entries { + peers.truncate(max); + } // Dump the connected peers to a file. let mut path = aleo_ledger_dir(N::ID, storage_mode); - path.push(PEER_CACHE_FILENAME); + path.push(filename); let mut file = fs::File::create(path)?; for peer in peers { writeln!(file, "{}", peer.listener_addr())?; @@ -361,24 +393,19 @@ impl Router { // Initialize the TCP stack. let tcp = Tcp::new(Config::new(node_ip, max_peers)); - // Add the trusted peers to the peer pool - let mut initial_peers = trusted_peers - .iter() - .copied() - .map(|addr| (addr, Peer::new_candidate(addr, true))) - .collect::>(); - - // Load additional peers from the peer cache (if present). - let mut peer_cache_path = aleo_ledger_dir(N::ID, &storage_mode); - peer_cache_path.push(PEER_CACHE_FILENAME); - if let Ok(cached_peers_str) = fs::read_to_string(&peer_cache_path) { - for peer_addr_str in cached_peers_str.lines() { - if let Ok(addr) = SocketAddr::from_str(peer_addr_str) { - initial_peers.insert(addr, Peer::new_candidate(addr, false)); - } - } + // Prepare the collection of the initial peers. + let mut initial_peers = HashMap::new(); + + // Load entries from the peer cache (if present). + let cached_peers = Self::load_cached_peers(&storage_mode, PEER_CACHE_FILENAME)?; + for addr in cached_peers { + initial_peers.insert(addr, Peer::new_candidate(addr, false)); } + // Add the trusted peers to the list of the initial peers; this may promote + // some of the cached peers to trusted ones. + initial_peers.extend(trusted_peers.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true)))); + // Initialize the router. Ok(Self(Arc::new(InnerRouter { tcp, @@ -629,7 +656,7 @@ impl Router { pub async fn shut_down(&self) { info!("Shutting down the router..."); // Save the best peers for future use. - if let Err(e) = self.save_best_peers(&self.storage_mode) { + if let Err(e) = self.save_best_peers(&self.storage_mode, PEER_CACHE_FILENAME, Some(MAX_PEERS_TO_SEND)) { warn!("Failed to persist best peers to disk: {e}"); } // Abort the tasks.