From 42e223c5f0a69b2e9370982121f6203742756b67 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 16 Aug 2022 19:40:06 +0000 Subject: [PATCH] reverts wide fanout in broadcast when the root node is down (#26359) A change included in https://github.com/solana-labs/solana/pull/20480 was that when the root node in turbine broadcast tree is down, the leader will broadcast the shred to all nodes in the first layer. The intention was to mitigate the impact of dead nodes on shreds propagation, because if the root node is down, then the entire cluster will miss out the shred. On the other hand, if x% of stake is down, this will cause 200*x% + 1 packets/shreds ratio at the broadcast stage which might contribute to line-rate saturation and packet drop. To avoid this bandwidth saturation issue, this commit reverts that logic and always broadcasts shreds from the leader only to the root node. As before we rely on erasure codes to recover shreds lost due to staked nodes being offline. (cherry picked from commit 3b87aa922720b151632bb38ee08b2622713822ff) --- core/src/broadcast_stage.rs | 20 +++---- .../broadcast_duplicates_run.rs | 21 +------ core/src/cluster_nodes.rs | 59 ++----------------- 3 files changed, 16 insertions(+), 84 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 18ab25a0b914c9..ba4c33fa38cc46 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -14,7 +14,10 @@ use { }, crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender}, itertools::Itertools, - solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}, + solana_gossip::{ + cluster_info::{ClusterInfo, ClusterInfoError}, + contact_info::ContactInfo, + }, solana_ledger::{blockstore::Blockstore, shred::Shred}, solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, @@ -32,7 +35,6 @@ use { }, std::{ collections::{HashMap, HashSet}, - iter::repeat, net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, @@ -390,8 +392,8 @@ fn update_peer_stats( } } -/// broadcast messages from the leader to layer 1 nodes -/// # Remarks +/// Broadcasts shreds from the leader (i.e. this node) to the root of the +/// turbine retransmit tree for each shred. pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], @@ -416,14 +418,10 @@ pub fn broadcast_shreds( let cluster_nodes = cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); update_peer_stats(&cluster_nodes, last_datapoint_submit); - let root_bank = root_bank.clone(); shreds.flat_map(move |shred| { - repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs( - &shred.id(), - &root_bank, - DATA_PLANE_FANOUT, - socket_addr_space, - )) + let node = cluster_nodes.get_broadcast_peer(&shred.id())?; + ContactInfo::is_valid_address(&node.tvu, socket_addr_space) + .then(|| (shred.payload(), node.tvu)) }) }) .collect(); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 741be826c44982..9e60d6c8196cfe 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -3,7 +3,7 @@ use { crate::cluster_nodes::ClusterNodesCache, itertools::Itertools, solana_entry::entry::Entry, - solana_gossip::cluster_info::DATA_PLANE_FANOUT, + solana_gossip::contact_info::ContactInfo, solana_ledger::shred::{ProcessShredsStats, Shredder}, solana_sdk::{ hash::Hash, @@ -270,12 +270,6 @@ impl BroadcastRun for BroadcastDuplicatesRun { (bank_forks.root_bank(), bank_forks.working_bank()) }; let self_pubkey = cluster_info.id(); - let nodes: Vec<_> = cluster_info - .all_peers() - .into_iter() - .map(|(node, _)| node) - .collect(); - // Create cluster partition. let cluster_partition: HashSet = { let mut cumilative_stake = 0; @@ -302,17 +296,8 @@ impl BroadcastRun for BroadcastDuplicatesRun { let packets: Vec<_> = shreds .iter() .filter_map(|shred| { - let addr = cluster_nodes - .get_broadcast_addrs( - &shred.id(), - &root_bank, - DATA_PLANE_FANOUT, - socket_addr_space, - ) - .first() - .copied()?; - let node = nodes.iter().find(|node| node.tvu == addr)?; - if !socket_addr_space.check(&node.tvu) { + let node = cluster_nodes.get_broadcast_peer(&shred.id())?; + if ContactInfo::is_valid_address(&node.tvu, socket_addr_space) { return None; } if self diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index f83175a9946f8d..22fcc882c07186 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -26,7 +26,7 @@ use { any::TypeId, cmp::Reverse, collections::HashMap, - iter::{once, repeat_with}, + iter::repeat_with, marker::PhantomData, net::SocketAddr, ops::Deref, @@ -114,62 +114,11 @@ impl ClusterNodes { new_cluster_nodes(cluster_info, stakes) } - pub(crate) fn get_broadcast_addrs( - &self, - shred: &ShredId, - root_bank: &Bank, - fanout: usize, - socket_addr_space: &SocketAddrSpace, - ) -> Vec { - const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60); + pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> { let shred_seed = shred.seed(&self.pubkey); let mut rng = ChaChaRng::from_seed(shred_seed); - let index = match self.weighted_shuffle.first(&mut rng) { - None => return Vec::default(), - Some(index) => index, - }; - if let Some(node) = self.nodes[index].contact_info() { - let now = timestamp(); - let age = Duration::from_millis(now.saturating_sub(node.wallclock)); - if age < MAX_CONTACT_INFO_AGE - && ContactInfo::is_valid_address(&node.tvu, socket_addr_space) - { - return vec![node.tvu]; - } - } - let mut rng = ChaChaRng::from_seed(shred_seed); - let nodes: Vec<&Node> = self - .weighted_shuffle - .clone() - .shuffle(&mut rng) - .map(|index| &self.nodes[index]) - .collect(); - if nodes.is_empty() { - return Vec::default(); - } - if drop_redundant_turbine_path(shred.slot(), root_bank) { - let peers = once(nodes[0]).chain(get_retransmit_peers(fanout, 0, &nodes)); - let addrs = peers.filter_map(Node::contact_info).map(|peer| peer.tvu); - return addrs - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) - .collect(); - } - let (neighbors, children) = compute_retransmit_peers(fanout, 0, &nodes); - neighbors[..1] - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu)) - .chain( - neighbors[1..] - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu_forwards)), - ) - .chain( - children - .iter() - .filter_map(|node| Some(node.contact_info()?.tvu)), - ) - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) - .collect() + let index = self.weighted_shuffle.first(&mut rng)?; + self.nodes[index].contact_info() } }