Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 97 additions & 33 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker},
spawn_blocking,
};
use aleo_std::StorageMode;
use snarkos_account::Account;
use snarkos_node_bft_events::{
BlockRequest,
Expand Down Expand Up @@ -99,8 +100,6 @@ const MAX_CONNECTION_ATTEMPTS: usize = 10;
/// The maximum interval to restrict a peer.
const RESTRICTED_INTERVAL: i64 = (MAX_CONNECTION_ATTEMPTS as u64 * MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds

/// The minimum number of validators to maintain a connection to.
const MIN_CONNECTED_VALIDATORS: usize = 175;
/// The maximum number of validators to send in a validators response event.
const MAX_VALIDATORS_TO_SEND: usize = 200;

Expand All @@ -110,6 +109,9 @@ const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10;
/// The amount of time an IP address is prohibited from connecting.
const IP_BAN_TIME_IN_SECS: u64 = 300;

/// The name of the file containing cached validators.
const VALIDATOR_CACHE_FILENAME: &str = "cached_gateway_peers";

/// Part of the Gateway API that deals with networking.
/// This is a separate trait to allow for easier testing/mocking.
#[async_trait]
Expand All @@ -121,7 +123,17 @@ pub trait Transport<N: Network>: Send + Sync {
/// The gateway maintains connections to other validators.
/// For connections with clients and provers, the Router logic is used.
#[derive(Clone)]
pub struct Gateway<N: Network> {
pub struct Gateway<N: Network>(Arc<InnerGateway<N>>);

impl<N: Network> Deref for Gateway<N> {
type Target = Arc<InnerGateway<N>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

pub struct InnerGateway<N: Network> {
/// The account of the node.
account: Account<N>,
/// The storage.
Expand All @@ -131,21 +143,23 @@ pub struct Gateway<N: Network> {
/// The TCP stack.
tcp: Tcp,
/// The cache.
cache: Arc<Cache<N>>,
cache: Cache<N>,
/// The resolver.
resolver: Arc<RwLock<Resolver<N>>>,
resolver: RwLock<Resolver<N>>,
/// The collection of both candidate and connected peers.
peer_pool: Arc<RwLock<HashMap<SocketAddr, Peer<N>>>>,
peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
#[cfg(feature = "telemetry")]
validator_telemetry: Telemetry<N>,
/// The primary sender.
primary_sender: Arc<OnceCell<PrimarySender<N>>>,
primary_sender: OnceCell<PrimarySender<N>>,
/// The worker senders.
worker_senders: Arc<OnceCell<IndexMap<u8, WorkerSender<N>>>>,
worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
/// The sync sender.
sync_sender: Arc<OnceCell<SyncSender<N>>>,
sync_sender: OnceCell<SyncSender<N>>,
/// The spawned handles.
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
handles: Mutex<Vec<JoinHandle<()>>>,
/// The storage mode.
storage_mode: StorageMode,
/// The development mode.
dev: Option<u16>,
}
Expand All @@ -164,6 +178,7 @@ impl<N: Network> Gateway<N> {
ledger: Arc<dyn LedgerService<N>>,
ip: Option<SocketAddr>,
trusted_validators: &[SocketAddr],
storage_mode: StorageMode,
dev: Option<u16>,
) -> Result<Self> {
// Initialize the gateway IP.
Expand All @@ -175,30 +190,37 @@ impl<N: Network> Gateway<N> {
// Initialize the TCP stack.
let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));

// Add the trusted validators to the peer pool.
let initial_peers = trusted_validators
.iter()
.copied()
.map(|addr| (addr, Peer::new_candidate(addr, true)))
.collect::<HashMap<_, _>>();
// Prepare the collection of the initial peers.
let mut initial_peers = HashMap::new();

// Load entries from the validator cache (if present).
let cached_peers = Self::load_cached_peers(&storage_mode, VALIDATOR_CACHE_FILENAME)?;
for addr in cached_peers {
initial_peers.insert(addr, Peer::new_candidate(addr, false));
}

// Add the trusted peers to the list of the initial peers; this may promote
// some of the cached validators to trusted ones.
initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));

// Return the gateway.
Ok(Self {
Ok(Self(Arc::new(InnerGateway {
account,
storage,
ledger,
tcp,
cache: Default::default(),
resolver: Default::default(),
peer_pool: Arc::new(RwLock::new(initial_peers)),
peer_pool: RwLock::new(initial_peers),
#[cfg(feature = "telemetry")]
validator_telemetry: Default::default(),
primary_sender: Default::default(),
worker_senders: Default::default(),
sync_sender: Default::default(),
handles: Default::default(),
storage_mode,
dev,
})
})))
}

/// Run the gateway.
Expand Down Expand Up @@ -304,12 +326,12 @@ impl<N: Network> CommunicationService for Gateway<N> {

impl<N: Network> Gateway<N> {
/// Returns the account of the node.
pub const fn account(&self) -> &Account<N> {
pub fn account(&self) -> &Account<N> {
&self.account
}

/// Returns the dev identifier of the node.
pub const fn dev(&self) -> Option<u16> {
pub fn dev(&self) -> Option<u16> {
self.dev
}

Expand Down Expand Up @@ -824,7 +846,7 @@ impl<N: Network> Gateway<N> {
self.cache.decrement_outbound_validators_requests(peer_ip);

// If the number of connected validators is less than the minimum, connect to more validators.
if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
// Attempt to connect to any validators that are not already connected.
let self_ = self.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -929,6 +951,10 @@ impl<N: Network> Gateway<N> {
/// Shuts down the gateway.
pub async fn shut_down(&self) {
info!("Shutting down the gateway...");
// Save the best peers for future use.
if let Err(e) = self.save_best_peers(&self.storage_mode, VALIDATOR_CACHE_FILENAME, None) {
warn!("Failed to persist best validators to disk: {e}");
}
// Abort the tasks.
self.handles.lock().iter().for_each(|handle| handle.abort());
// Close the listener.
Expand Down Expand Up @@ -1045,9 +1071,15 @@ impl<N: Network> Gateway<N> {

/// This function sends a `ValidatorsRequest` to a random validator,
/// if the number of connected validators is less than the minimum.
/// It also attempts to connect to known unconnected validators.
fn handle_min_connected_validators(&self) {
// If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS {
if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
for candidate_addr in self.candidate_peers() {
// Attempt to connect to unconnected validators.
self.connect(candidate_addr);
}

// Retrieve the connected validators.
let validators = self.connected_peers();
// If there are no validator IPs to connect to, return early.
Expand Down Expand Up @@ -1564,6 +1596,7 @@ mod prop_tests {
gateway::prop_tests::GatewayAddress::{Dev, Prod},
helpers::{Storage, init_primary_channels, init_worker_channels},
};
use aleo_std::StorageMode;
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
Expand Down Expand Up @@ -1637,6 +1670,7 @@ mod prop_tests {
storage.ledger().clone(),
address.ip(),
&[],
StorageMode::new_test(None),
address.port(),
)
.unwrap()
Expand Down Expand Up @@ -1682,9 +1716,16 @@ mod prop_tests {
let (storage, _, private_key, dev) = input;
let account = Account::try_from(private_key).unwrap();

let gateway =
Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
.unwrap();
let gateway = Gateway::new(
account.clone(),
storage.clone(),
storage.ledger().clone(),
dev.ip(),
&[],
StorageMode::new_test(None),
dev.port(),
)
.unwrap();
let tcp_config = gateway.tcp().config();
assert_eq!(tcp_config.listener_ip, Some(IpAddr::V4(Ipv4Addr::LOCALHOST)));
assert_eq!(tcp_config.desired_listening_port, Some(MEMORY_POOL_PORT + dev.port().unwrap()));
Expand All @@ -1699,9 +1740,16 @@ mod prop_tests {
let (storage, _, private_key, dev) = input;
let account = Account::try_from(private_key).unwrap();

let gateway =
Gateway::new(account.clone(), storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port())
.unwrap();
let gateway = Gateway::new(
account.clone(),
storage.clone(),
storage.ledger().clone(),
dev.ip(),
&[],
StorageMode::new_test(None),
dev.port(),
)
.unwrap();
let tcp_config = gateway.tcp().config();
if let Some(socket_addr) = dev.ip() {
assert_eq!(tcp_config.listener_ip, Some(socket_addr.ip()));
Expand All @@ -1726,8 +1774,16 @@ mod prop_tests {
let worker_storage = storage.clone();
let account = Account::try_from(private_key).unwrap();

let gateway =
Gateway::new(account, storage.clone(), storage.ledger().clone(), dev.ip(), &[], dev.port()).unwrap();
let gateway = Gateway::new(
account,
storage.clone(),
storage.ledger().clone(),
dev.ip(),
&[],
StorageMode::new_test(None),
dev.port(),
)
.unwrap();

let (primary_sender, _) = init_primary_channels();

Expand Down Expand Up @@ -1791,8 +1847,16 @@ mod prop_tests {
// Initialize the storage.
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
// Initialize the gateway.
let gateway =
Gateway::new(account.clone(), storage.clone(), ledger.clone(), dev.ip(), &[], dev.port()).unwrap();
let gateway = Gateway::new(
account.clone(),
storage.clone(),
ledger.clone(),
dev.ip(),
&[],
StorageMode::new_test(None),
dev.port(),
)
.unwrap();
// Insert certificate to the storage.
for certificate in certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
Expand Down
5 changes: 3 additions & 2 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ impl<N: Network> Primary<N> {
dev: Option<u16>,
) -> Result<Self> {
// Initialize the gateway.
let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?;
let gateway =
Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, storage_mode.clone(), dev)?;
// Initialize the sync module.
let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);

Expand Down Expand Up @@ -2008,7 +2009,7 @@ mod tests {
let account = accounts[account_index].1.clone();
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
let mut primary =
Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::Test(None), None).unwrap();
Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::new_test(None), None).unwrap();

// Construct a worker instance.
primary.workers = Arc::from([Worker::new(
Expand Down
6 changes: 4 additions & 2 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,12 +1173,14 @@ mod tests {
core_ledger.advance_to_next_block(&block_3)?;

// Initialize the syncing ledger.
let storage_mode = StorageMode::new_test(None);
let syncing_ledger = Arc::new(CoreLedgerService::new(
CurrentLedger::load(genesis, StorageMode::new_test(None)).unwrap(),
CurrentLedger::load(genesis, storage_mode.clone()).unwrap(),
Default::default(),
));
// Initialize the gateway.
let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
let gateway =
Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], storage_mode, None)?;
// Initialize the block synchronization logic.
let block_sync = Arc::new(BlockSync::new(syncing_ledger.clone()));
// Initialize the sync module.
Expand Down
3 changes: 2 additions & 1 deletion node/bft/tests/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

use crate::common::{CurrentNetwork, TranslucentLedgerService, primary};
use aleo_std::StorageMode;
use snarkos_account::Account;
use snarkos_node_bft::{
Gateway,
Expand Down Expand Up @@ -216,7 +217,7 @@ pub fn sample_gateway<N: Network>(
ledger: Arc<TranslucentLedgerService<N, ConsensusMemory<N>>>,
) -> Gateway<N> {
// Initialize the gateway.
Gateway::new(account, storage, ledger, None, &[], None).unwrap()
Gateway::new(account, storage, ledger, None, &[], StorageMode::new_test(None), None).unwrap()
}

/// Samples a new worker with the given ledger.
Expand Down
Loading