diff --git a/bloom/Cargo.toml b/bloom/Cargo.toml index 250450e69763cf..bafa976eb2f217 100644 --- a/bloom/Cargo.toml +++ b/bloom/Cargo.toml @@ -7,7 +7,7 @@ authors = { workspace = true } repository = { workspace = true } homepage = { workspace = true } license = { workspace = true } -edition = { workspace = true } +edition = "2024" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/bloom/benches/bloom.rs b/bloom/benches/bloom.rs index afd6ac896b0f17..dd3ccc1e1470d0 100644 --- a/bloom/benches/bloom.rs +++ b/bloom/benches/bloom.rs @@ -1,7 +1,7 @@ #![allow(clippy::arithmetic_side_effects)] use { - bencher::{benchmark_group, benchmark_main, Bencher}, + bencher::{Bencher, benchmark_group, benchmark_main}, bv::BitVec, fnv::FnvHasher, rand::Rng, diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 2ef418357095a4..98d19c9ec9ad1c 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -7,7 +7,7 @@ description = { workspace = true } repository = { workspace = true } homepage = { workspace = true } license = { workspace = true } -edition = { workspace = true } +edition = "2024" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/gossip/benches/crds.rs b/gossip/benches/crds.rs index e1f29edbc8839f..afbdc73ffc3d73 100644 --- a/gossip/benches/crds.rs +++ b/gossip/benches/crds.rs @@ -1,10 +1,10 @@ use { - criterion::{criterion_group, criterion_main, Criterion}, - rand::{rng, Rng}, + criterion::{Criterion, criterion_group, criterion_main}, + rand::{Rng, rng}, rayon::ThreadPoolBuilder, solana_gossip::{ crds::{Crds, GossipRoute}, - crds_gossip_pull::{CrdsTimeouts, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, + crds_gossip_pull::{CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, CrdsTimeouts}, crds_value::CrdsValue, }, solana_pubkey::Pubkey, diff --git a/gossip/benches/crds_gossip_pull.rs b/gossip/benches/crds_gossip_pull.rs index b8c4e82c80f506..d3b8e8670fc82f 100644 --- a/gossip/benches/crds_gossip_pull.rs +++ b/gossip/benches/crds_gossip_pull.rs @@ -1,6 +1,6 @@ use { - criterion::{criterion_group, criterion_main, Criterion}, - rand::{rng, Rng}, + criterion::{Criterion, criterion_group, criterion_main}, + rand::{Rng, rng}, rayon::ThreadPoolBuilder, solana_gossip::{ crds::{Crds, GossipRoute}, diff --git a/gossip/benches/crds_shards.rs b/gossip/benches/crds_shards.rs index 10fb19cccc39d6..e7ee10cb0aaa5a 100644 --- a/gossip/benches/crds_shards.rs +++ b/gossip/benches/crds_shards.rs @@ -1,6 +1,6 @@ use { - criterion::{criterion_group, criterion_main, Criterion}, - rand::{rng, Rng}, + criterion::{Criterion, criterion_group, criterion_main}, + rand::{Rng, rng}, solana_gossip::{ crds::{Crds, GossipRoute, VersionedCrdsValue}, crds_shards::CrdsShards, diff --git a/gossip/benches/weighted_shuffle.rs b/gossip/benches/weighted_shuffle.rs index e3c28721162710..80e328ded89e11 100644 --- a/gossip/benches/weighted_shuffle.rs +++ b/gossip/benches/weighted_shuffle.rs @@ -1,5 +1,5 @@ use { - criterion::{criterion_group, criterion_main, Criterion}, + criterion::{Criterion, criterion_group, criterion_main}, rand::{Rng, SeedableRng}, rand_chacha::ChaChaRng, solana_gossip::weighted_shuffle::WeightedShuffle, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 1f690994634859..bcc8d81b4969fe 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -18,13 +18,13 @@ use { cluster_info_metrics::{Counter, GossipStats, ScopedTimer, TimedGuard}, contact_info::{self, ContactInfo, ContactInfoQuery, Error as ContactInfoError}, crds::{Crds, Cursor, GossipRoute}, - crds_data::{self, CrdsData, EpochSlotsIndex, LowestSlot, SnapshotHashes, Vote, MAX_VOTES}, - crds_filter::{should_retain_crds_value, GossipFilterDirection}, + crds_data::{self, CrdsData, EpochSlotsIndex, LowestSlot, MAX_VOTES, SnapshotHashes, Vote}, + crds_filter::{GossipFilterDirection, should_retain_crds_value}, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{ - get_max_bloom_filter_bytes, CrdsFilter, CrdsTimeouts, ProcessPullStats, PullRequest, - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, CrdsFilter, CrdsTimeouts, ProcessPullStats, + PullRequest, get_max_bloom_filter_bytes, }, crds_value::{CrdsValue, CrdsValueLabel}, duplicate_shred::DuplicateShred, @@ -33,10 +33,10 @@ use { gossip_error::GossipError, ping_pong::Pong, protocol::{ - split_gossip_messages, Ping, PingCache, Protocol, PruneData, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_PRUNE_DATA_NODES, PULL_RESPONSE_MAX_PAYLOAD_SIZE, - PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE, + PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE, Ping, PingCache, + Protocol, PruneData, split_gossip_messages, }, restart_crds_values::{ RestartHeaviestFork, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError, @@ -46,17 +46,16 @@ use { arc_swap::ArcSwap, crossbeam_channel::{Receiver, TrySendError}, itertools::{Either, Itertools}, - rand::{prelude::IndexedMutRandom, CryptoRng, Rng}, - rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, - solana_clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, + rand::{CryptoRng, Rng, prelude::IndexedMutRandom}, + rayon::{ThreadPool, ThreadPoolBuilder, prelude::*}, + solana_clock::{DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH, Slot}, solana_hash::Hash, - solana_keypair::{signable::Signable, Keypair}, + solana_keypair::{Keypair, signable::Signable}, solana_ledger::shred::Shred, solana_net_utils::{ - bind_in_range, + PortRange, SocketAddrSpace, VALIDATOR_PORT_RANGE, bind_in_range, multihomed_sockets::BindIpAddrs, sockets::{bind_gossip_port_in_range, bind_to_localhost_unique}, - PortRange, SocketAddrSpace, VALIDATOR_PORT_RANGE, }, solana_perf::{ data_budget::DataBudget, @@ -89,10 +88,10 @@ use { rc::Rc, result::Result, sync::{ - atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock, RwLockReadGuard, + atomic::{AtomicBool, Ordering}, }, - thread::{sleep, Builder, JoinHandle}, + thread::{Builder, JoinHandle, sleep}, time::{Duration, Instant}, }, thiserror::Error, @@ -1205,15 +1204,14 @@ impl ClusterInfo { return Either::Left(pulls); } entrypoint.set_wallclock(now); - if let Some(entrypoint_gossip) = entrypoint.gossip() { - if self + if let Some(entrypoint_gossip) = entrypoint.gossip() + && self .time_gossip_read_lock("entrypoint", &self.stats.entrypoint) .get_nodes_contact_info() .any(|node| node.gossip() == Some(entrypoint_gossip)) - { - // Found the entrypoint, no need to pull from it. - return Either::Left(pulls); - } + { + // Found the entrypoint, no need to pull from it. + return Either::Left(pulls); } } let Some(entrypoint) = entrypoint.gossip() else { @@ -1378,12 +1376,12 @@ impl ClusterInfo { ) .filter_map(|(addr, data)| make_gossip_packet(addr, &data, &self.stats)) .for_each(|pkt| packet_batch.push(pkt)); - if !packet_batch.is_empty() { - if let Err(TrySendError::Full(packet_batch)) = sender.try_send(packet_batch.into()) { - self.stats - .gossip_transmit_packets_dropped_count - .add_relaxed(packet_batch.len() as u64); - } + if !packet_batch.is_empty() + && let Err(TrySendError::Full(packet_batch)) = sender.try_send(packet_batch.into()) + { + self.stats + .gossip_transmit_packets_dropped_count + .add_relaxed(packet_batch.len() as u64); } self.stats .gossip_transmit_loop_iterations_since_last_report @@ -1610,13 +1608,12 @@ impl ClusterInfo { let _st = ScopedTimer::from(&self.stats.handle_batch_pull_requests_time); if !requests.is_empty() { let response = self.handle_pull_requests(thread_pool, recycler, requests, stakes); - if !response.is_empty() { - if let Err(TrySendError::Full(response)) = response_sender.try_send(response.into()) - { - self.stats - .gossip_packets_dropped_count - .add_relaxed(response.len() as u64); - } + if !response.is_empty() + && let Err(TrySendError::Full(response)) = response_sender.try_send(response.into()) + { + self.stats + .gossip_packets_dropped_count + .add_relaxed(response.len() as u64); } } } @@ -1892,14 +1889,13 @@ impl ClusterInfo { self.new_push_requests(stakes) .filter_map(|(addr, data)| make_gossip_packet(addr, &data, &self.stats)) .for_each(|pkt| packet_batch.push(pkt)); - if !packet_batch.is_empty() { - if let Err(TrySendError::Full(packet_batch)) = + if !packet_batch.is_empty() + && let Err(TrySendError::Full(packet_batch)) = response_sender.try_send(packet_batch.into()) - { - self.stats - .gossip_packets_dropped_count - .add_relaxed(packet_batch.len() as u64); - } + { + self.stats + .gossip_packets_dropped_count + .add_relaxed(packet_batch.len() as u64); } } @@ -2471,7 +2467,7 @@ fn discard_different_shred_version( Protocol::PullRequest(..) => return, // No CRDS values in Prune, Ping and Pong messages. Protocol::PruneMessage(_, _) | Protocol::PingMessage(_) | Protocol::PongMessage(_) => { - return + return; } }; let num_values = values.len(); @@ -2593,7 +2589,7 @@ mod tests { solana_streamer::quic::DEFAULT_QUIC_ENDPOINTS, solana_vote_program::{ vote_instruction, - vote_state::{Vote, MAX_LOCKOUT_HISTORY}, + vote_state::{MAX_LOCKOUT_HISTORY, Vote}, }, std::{ iter::repeat_with, @@ -3236,12 +3232,14 @@ mod tests { tower.clear(); tower.extend(0..=slot); let vote = new_vote_transaction(vec![slot]); - assert!(panic::catch_unwind(|| cluster_info.push_vote(&tower, vote)) - .err() - .and_then(|a| a - .downcast_ref::() - .map(|s| { s.starts_with("Submitting old vote") })) - .unwrap_or_default()); + assert!( + panic::catch_unwind(|| cluster_info.push_vote(&tower, vote)) + .err() + .and_then(|a| a + .downcast_ref::() + .map(|s| { s.starts_with("Submitting old vote") })) + .unwrap_or_default() + ); } #[test] @@ -3273,9 +3271,11 @@ mod tests { { let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); for entry in entries { - assert!(gossip_crds - .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) - .is_ok()); + assert!( + gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok() + ); } } // Should exclude other node's epoch-slot because of different @@ -3410,9 +3410,11 @@ mod tests { let (pings, pulls) = cluster_info.old_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); - assert!(pulls - .into_iter() - .all(|(addr, _)| addr == other_node.gossip().unwrap())); + assert!( + pulls + .into_iter() + .all(|(addr, _)| addr == other_node.gossip().unwrap()) + ); // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should // now be two pull requests @@ -3434,9 +3436,11 @@ mod tests { let (pings, pulls) = cluster_info.old_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); - assert!(pulls - .into_iter() - .all(|(addr, _)| addr == other_node.gossip().unwrap())); + assert!( + pulls + .into_iter() + .all(|(addr, _)| addr == other_node.gossip().unwrap()) + ); } #[test] @@ -3591,9 +3595,11 @@ mod tests { let leader = Arc::new(Keypair::new()); let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); - assert!(cluster_info - .push_duplicate_shred(&shred1, shred2.payload()) - .is_ok()); + assert!( + cluster_info + .push_duplicate_shred(&shred1, shred2.payload()) + .is_ok() + ); cluster_info.flush_push_queue(); let entries = cluster_info.get_duplicate_shreds(&mut cursor); // One duplicate shred proof is split into 3 chunks. @@ -3609,9 +3615,11 @@ mod tests { let next_shred_index = 354; let shred3 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); let shred4 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); - assert!(cluster_info - .push_duplicate_shred(&shred3, shred4.payload()) - .is_ok()); + assert!( + cluster_info + .push_duplicate_shred(&shred3, shred4.payload()) + .is_ok() + ); cluster_info.flush_push_queue(); let entries1 = cluster_info.get_duplicate_shreds(&mut cursor); // One duplicate shred proof is split into 3 chunks. @@ -3636,9 +3644,11 @@ mod tests { update.push(i * 1050 + j); } } - assert!(cluster_info - .push_restart_last_voted_fork_slots(&update, Hash::default()) - .is_ok()); + assert!( + cluster_info + .push_restart_last_voted_fork_slots(&update, Hash::default()) + .is_ok() + ); cluster_info.flush_push_queue(); let mut cursor = Cursor::default(); @@ -3665,9 +3675,11 @@ mod tests { { let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); for entry in entries { - assert!(gossip_crds - .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) - .is_ok()); + assert!( + gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok() + ); } } // Should exclude other node's last-voted-fork-slot because of different @@ -3681,9 +3693,11 @@ mod tests { let mut node = cluster_info.my_contact_info.write().unwrap(); node.set_shred_version(42); } - assert!(cluster_info - .push_restart_last_voted_fork_slots(&update, Hash::default()) - .is_ok()); + assert!( + cluster_info + .push_restart_last_voted_fork_slots(&update, Hash::default()) + .is_ok() + ); cluster_info.flush_push_queue(); // Should now include both slots. let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); @@ -3742,9 +3756,11 @@ mod tests { { let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); for entry in entries { - assert!(gossip_crds - .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) - .is_ok()); + assert!( + gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok() + ); } } // Should exclude other node's heaviest_fork because of different @@ -3884,9 +3900,10 @@ mod tests { )), &keypair2, ); - assert!(crds - .insert(ci_wrong_pubkey, /*now=*/ 0, GossipRoute::LocalMessage) - .is_ok()); + assert!( + crds.insert(ci_wrong_pubkey, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok() + ); // Test insert EpochSlot w/ previous ContactInfo w/ matching shred version but different pubkey -> should be rejected let epoch_slots = EpochSlots::new_rand(&mut rng, Some(keypair.pubkey())); @@ -3898,9 +3915,10 @@ mod tests { } // Now insert ContactInfo with same pubkey as EpochSlot - assert!(crds - .insert(ci.clone(), /*now=*/ 0, GossipRoute::LocalMessage) - .is_ok()); + assert!( + crds.insert(ci.clone(), /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok() + ); let mut msg = Protocol::PushMessage(keypair.pubkey(), vec![es]); discard_different_shred_version(&mut msg, self_shred_version, &crds, &stats); diff --git a/gossip/src/contact_info.rs b/gossip/src/contact_info.rs index d5bf67f80a7f8e..9b5543e034abf2 100644 --- a/gossip/src/contact_info.rs +++ b/gossip/src/contact_info.rs @@ -681,9 +681,7 @@ macro_rules! socketaddr { ($ip:expr, $port:expr) => { std::net::SocketAddr::from((std::net::Ipv4Addr::from($ip), $port)) }; - ($str:expr) => {{ - $str.parse::().unwrap() - }}; + ($str:expr) => {{ $str.parse::().unwrap() }}; } #[macro_export] @@ -698,8 +696,8 @@ mod tests { use { super::*, rand::{ - prelude::{IndexedRandom as _, SliceRandom as _}, Rng, + prelude::{IndexedRandom as _, SliceRandom as _}, }, solana_keypair::Keypair, solana_signer::Signer, @@ -959,19 +957,22 @@ mod tests { sockets.values().map(SocketAddr::ip).collect::>(), ); // Assert that all sockets reference a valid IP address. - assert!(node - .sockets - .iter() - .map(|entry| node.addrs.get(usize::from(entry.index))) - .all(|addr| addr.is_some())); - // Assert that port offsets don't overflow. - assert!(u16::try_from( + assert!( node.sockets .iter() - .map(|entry| u64::from(entry.offset)) - .sum::() - ) - .is_ok()); + .map(|entry| node.addrs.get(usize::from(entry.index))) + .all(|addr| addr.is_some()) + ); + // Assert that port offsets don't overflow. + assert!( + u16::try_from( + node.sockets + .iter() + .map(|entry| u64::from(entry.offset)) + .sum::() + ) + .is_ok() + ); // Assert that serde round trips. let bytes = bincode::serialize(&node).unwrap(); let other: ContactInfo = bincode::deserialize(&bytes).unwrap(); diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index abe180b75eb096..2d2d75f2401c94 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -37,17 +37,17 @@ use { }, assert_matches::debug_assert_matches, indexmap::{ - map::{rayon::ParValues, Entry, IndexMap}, + map::{Entry, IndexMap, rayon::ParValues}, set::IndexSet, }, lru::LruCache, - rayon::{prelude::*, ThreadPool}, + rayon::{ThreadPool, prelude::*}, solana_clock::Slot, solana_hash::Hash, solana_pubkey::Pubkey, std::{ cmp::Ordering, - collections::{hash_map, BTreeMap, HashMap, VecDeque}, + collections::{BTreeMap, HashMap, VecDeque, hash_map}, ops::{Bound, Index, IndexMut}, sync::Mutex, }, @@ -192,12 +192,11 @@ fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool { // Contact-infos are special cased so that if there are // two running instances of the same node, the more recent start is // propagated through gossip regardless of wallclocks. - if let CrdsData::ContactInfo(value) = value.data() { - if let CrdsData::ContactInfo(other) = other.value.data() { - if let Some(out) = value.overrides(other) { - return out; - } - } + if let CrdsData::ContactInfo(value) = value.data() + && let CrdsData::ContactInfo(other) = other.value.data() + && let Some(out) = value.overrides(other) + { + return out; } match value.wallclock().cmp(&other.value.wallclock()) { Ordering::Less => false, @@ -518,16 +517,15 @@ impl Crds { // If the origin's contact-info hasn't expired yet then preserve // all associated values. let origin = CrdsValueLabel::ContactInfo(*pubkey); - if let Some(origin) = self.table.get(&origin) { - if origin + if let Some(origin) = self.table.get(&origin) + && origin .value .wallclock() .min(origin.local_timestamp) .saturating_add(timeout) > now - { - return vec![]; - } + { + return vec![]; } // Otherwise check each value's timestamp individually. index @@ -698,11 +696,11 @@ impl Default for CrdsDataStats { impl CrdsDataStats { fn record_insert(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) { self.counts[Self::ordinal(entry)] += 1; - if let CrdsData::Vote(_, vote) = entry.value.data() { - if let Some(slot) = vote.slot() { - let num_nodes = self.votes.get(&slot).copied().unwrap_or_default(); - self.votes.put(slot, num_nodes + 1); - } + if let CrdsData::Vote(_, vote) = entry.value.data() + && let Some(slot) = vote.slot() + { + let num_nodes = self.votes.get(&slot).copied().unwrap_or_default(); + self.votes.put(slot, num_nodes + 1); } let GossipRoute::PushMessage(from) = route else { @@ -781,8 +779,8 @@ impl CrdsStats { mod tests { use { super::*, - crate::crds_data::{new_rand_timestamp, AccountsHashes}, - rand::{rng, Rng}, + crate::crds_data::{AccountsHashes, new_rand_timestamp}, + rand::{Rng, rng}, rayon::ThreadPoolBuilder, solana_keypair::Keypair, solana_signer::Signer, diff --git a/gossip/src/crds_data.rs b/gossip/src/crds_data.rs index 216c78b75edfa3..7de2ee7f55c0eb 100644 --- a/gossip/src/crds_data.rs +++ b/gossip/src/crds_data.rs @@ -8,7 +8,7 @@ use { restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots}, }, rand::Rng, - serde::{de::Deserializer, Deserialize, Serialize}, + serde::{Deserialize, Serialize, de::Deserializer}, solana_clock::Slot, solana_hash::Hash, solana_pubkey::{self, Pubkey}, diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 6a0421334b520e..aafa28aca1b202 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -23,10 +23,10 @@ use { }, itertools::Itertools, rand::{ - distr::{weighted::WeightedIndex, Distribution}, Rng, + distr::{Distribution, weighted::WeightedIndex}, }, - rayon::{prelude::*, ThreadPool}, + rayon::{ThreadPool, prelude::*}, serde::{Deserialize, Serialize}, solana_bloom::bloom::{Bloom, ConcurrentBloom}, solana_hash::Hash, @@ -43,8 +43,8 @@ use { net::SocketAddr, ops::Index, sync::{ - atomic::{AtomicI64, AtomicUsize, Ordering}, LazyLock, Mutex, RwLock, + atomic::{AtomicI64, AtomicUsize, Ordering}, }, time::Duration, }, @@ -680,7 +680,7 @@ pub(crate) mod tests { protocol::Protocol, }, itertools::Itertools, - rand::{prelude::IndexedRandom as _, SeedableRng}, + rand::{SeedableRng, prelude::IndexedRandom as _}, rand_chacha::ChaChaRng, rayon::ThreadPoolBuilder, solana_hash::HASH_BYTES, diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 57ad91e12380f8..c24a586a91e9fc 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -34,8 +34,8 @@ use { net::SocketAddr, ops::{DerefMut, RangeBounds}, sync::{ - atomic::{AtomicUsize, Ordering}, Mutex, RwLock, + atomic::{AtomicUsize, Ordering}, }, }, }; @@ -348,9 +348,10 @@ mod tests { assert_eq!(crds.read().unwrap().get::<&CrdsValue>(&label), Some(&value)); // push it again - assert!(push - .process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0) - .is_empty()); + assert!( + push.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0) + .is_empty() + ); } #[test] fn test_process_push_old_version() { @@ -369,9 +370,10 @@ mod tests { // push an old version ci.set_wallclock(0); let value = CrdsValue::new_unsigned(CrdsData::from(ci)); - assert!(push - .process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0) - .is_empty()); + assert!( + push.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0) + .is_empty() + ); } #[test] fn test_process_push_timeout() { @@ -383,16 +385,18 @@ mod tests { // push a version to far in the future ci.set_wallclock(timeout + 1); let value = CrdsValue::new_unsigned(CrdsData::from(&ci)); - assert!(push - .process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0) - .is_empty()); + assert!( + push.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0) + .is_empty() + ); // push a version to far in the past ci.set_wallclock(0); let value = CrdsValue::new_unsigned(CrdsData::from(ci)); - assert!(push - .process_push_message(&crds, vec![(Pubkey::default(), vec![value])], timeout + 1) - .is_empty()); + assert!( + push.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], timeout + 1) + .is_empty() + ); } #[test] fn test_process_push_update() { @@ -641,13 +645,15 @@ mod tests { ); // push it again - assert!(push - .process_push_message(&crds, vec![(Pubkey::default(), vec![value.clone()])], 0) - .is_empty()); + assert!( + push.process_push_message(&crds, vec![(Pubkey::default(), vec![value.clone()])], 0) + .is_empty() + ); // push it again - assert!(push - .process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0) - .is_empty()); + assert!( + push.process_push_message(&crds, vec![(Pubkey::default(), vec![value])], 0) + .is_empty() + ); } } diff --git a/gossip/src/crds_shards.rs b/gossip/src/crds_shards.rs index c38ac234bd84c2..c1faaa9256819e 100644 --- a/gossip/src/crds_shards.rs +++ b/gossip/src/crds_shards.rs @@ -137,7 +137,7 @@ mod test { crds::{Crds, GossipRoute}, crds_value::CrdsValue, }, - rand::{rng, Rng}, + rand::{Rng, rng}, solana_time_utils::timestamp, std::{collections::HashSet, iter::repeat_with, ops::Index}, }; diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index bf5ab2824530d1..c11c7ebac81c73 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -8,9 +8,9 @@ use { arrayvec::ArrayVec, bincode::serialize, rand::Rng, - serde::{de::Deserializer, Deserialize, Serialize}, + serde::{Deserialize, Serialize, de::Deserializer}, solana_hash::Hash, - solana_keypair::{signable::Signable, Keypair}, + solana_keypair::{Keypair, signable::Signable}, solana_packet::PACKET_DATA_SIZE, solana_pubkey::Pubkey, solana_sanitize::{Sanitize, SanitizeError}, diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index c52e9aed6b9e69..4f22ee9aadbcf3 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -11,7 +11,7 @@ use { solana_pubkey::Pubkey, solana_sanitize::{Sanitize, SanitizeError}, std::{ - collections::{hash_map::Entry, HashMap}, + collections::{HashMap, hash_map::Entry}, convert::TryFrom, num::TryFromIntError, }, diff --git a/gossip/src/duplicate_shred_handler.rs b/gossip/src/duplicate_shred_handler.rs index d7d4da27ae4b2c..6c74fb00d2b7fd 100644 --- a/gossip/src/duplicate_shred_handler.rs +++ b/gossip/src/duplicate_shred_handler.rs @@ -238,7 +238,7 @@ mod tests { itertools::Itertools, solana_keypair::Keypair, solana_ledger::{ - genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, + genesis_utils::{GenesisConfigInfo, create_genesis_config_with_leader}, get_tmp_ledger_path_auto_delete, shred::Shredder, }, diff --git a/gossip/src/duplicate_shred_listener.rs b/gossip/src/duplicate_shred_listener.rs index 2c7be1e56e4267..3a45e1db2ed7e8 100644 --- a/gossip/src/duplicate_shred_listener.rs +++ b/gossip/src/duplicate_shred_listener.rs @@ -6,10 +6,10 @@ use { }, std::{ sync::{ - atomic::{AtomicBool, Ordering}, Arc, + atomic::{AtomicBool, Ordering}, }, - thread::{self, sleep, Builder, JoinHandle}, + thread::{self, Builder, JoinHandle, sleep}, time::Duration, }, }; @@ -77,8 +77,8 @@ mod tests { solana_net_utils::SocketAddrSpace, solana_signer::Signer, std::sync::{ - atomic::{AtomicU32, Ordering}, Arc, + atomic::{AtomicU32, Ordering}, }, }; struct FakeHandler { @@ -118,9 +118,11 @@ mod tests { let leader = Arc::new(Keypair::new()); let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); - assert!(cluster_info - .push_duplicate_shred(&shred1, shred2.payload()) - .is_ok()); + assert!( + cluster_info + .push_duplicate_shred(&shred1, shred2.payload()) + .is_ok() + ); cluster_info.flush_push_queue(); sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); assert_eq!(count.load(Ordering::Relaxed), 3); diff --git a/gossip/src/epoch_specs.rs b/gossip/src/epoch_specs.rs index cd40bf5be67273..ac3a62e01c8f4c 100644 --- a/gossip/src/epoch_specs.rs +++ b/gossip/src/epoch_specs.rs @@ -1,5 +1,5 @@ use { - solana_clock::{Epoch, DEFAULT_MS_PER_SLOT}, + solana_clock::{DEFAULT_MS_PER_SLOT, Epoch}, solana_epoch_schedule::EpochSchedule, solana_pubkey::Pubkey, solana_runtime::{ @@ -87,7 +87,7 @@ mod tests { use { super::*, solana_clock::Slot, - solana_runtime::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_runtime::genesis_utils::{GenesisConfigInfo, create_genesis_config}, }; #[test] diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 97178aaeb4ae3e..7a5cf126f84d33 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -8,10 +8,10 @@ use { epoch_specs::EpochSpecs, }, crossbeam_channel::Sender, - rand::{rng, Rng}, + rand::{Rng, rng}, solana_client::{connection_cache::ConnectionCache, tpu_client::TpuClientWrapper}, solana_keypair::Keypair, - solana_net_utils::{SocketAddrSpace, DEFAULT_IP_ECHO_SERVER_THREADS}, + solana_net_utils::{DEFAULT_IP_ECHO_SERVER_THREADS, SocketAddrSpace}, solana_perf::recycler::Recycler, solana_pubkey::Pubkey, solana_rpc_client::rpc_client::RpcClient, @@ -26,10 +26,10 @@ use { collections::HashSet, net::{SocketAddr, TcpListener, UdpSocket}, sync::{ - atomic::{AtomicBool, Ordering}, Arc, RwLock, + atomic::{AtomicBool, Ordering}, }, - thread::{self, sleep, Builder, JoinHandle}, + thread::{self, Builder, JoinHandle, sleep}, time::{Duration, Instant}, }, }; @@ -385,7 +385,7 @@ mod tests { use { super::*, crate::{cluster_info::ClusterInfo, contact_info::ContactInfo, node::Node}, - std::sync::{atomic::AtomicBool, Arc}, + std::sync::{Arc, atomic::AtomicBool}, }; #[test] diff --git a/gossip/src/node.rs b/gossip/src/node.rs index b5c29eeaa1da0f..190c96ec4330ae 100644 --- a/gossip/src/node.rs +++ b/gossip/src/node.rs @@ -10,9 +10,9 @@ use { find_available_ports_in_range, multihomed_sockets::BindIpAddrs, sockets::{ - bind_gossip_port_in_range, bind_in_range_with_config, bind_more_with_config, - bind_to_with_config, localhost_port_range_for_tests, multi_bind_in_range_with_config, - SocketConfiguration as SocketConfig, + SocketConfiguration as SocketConfig, bind_gossip_port_in_range, + bind_in_range_with_config, bind_more_with_config, bind_to_with_config, + localhost_port_range_for_tests, multi_bind_in_range_with_config, }, }, solana_pubkey::Pubkey, diff --git a/gossip/src/ping_pong.rs b/gossip/src/ping_pong.rs index b695040b4ed2d2..73945e7fb01f79 100644 --- a/gossip/src/ping_pong.rs +++ b/gossip/src/ping_pong.rs @@ -6,7 +6,7 @@ use { serde_big_array::BigArray, siphasher::sip::SipHasher24, solana_hash::Hash, - solana_keypair::{signable::Signable, Keypair}, + solana_keypair::{Keypair, signable::Signable}, solana_pubkey::Pubkey, solana_sanitize::{Sanitize, SanitizeError}, solana_signature::Signature, @@ -187,18 +187,18 @@ impl PingCache { return false; }; self.pongs.put(remote_node, now); - if let Some(sent_time) = self.ping_times.pop(&socket.ip()) { - if should_report_message_signature( + if let Some(sent_time) = self.ping_times.pop(&socket.ip()) + && should_report_message_signature( pong.signature(), PONG_SIGNATURE_SAMPLE_LEADING_ZEROS, - ) { - let rtt = now.saturating_duration_since(sent_time); - datapoint_info!( - "ping_rtt", - ("peer_ip", socket.ip().to_string(), String), - ("rtt_us", rtt.as_micros() as i64, i64), - ); - } + ) + { + let rtt = now.saturating_duration_since(sent_time); + datapoint_info!( + "ping_rtt", + ("peer_ip", socket.ip().to_string(), String), + ("rtt_us", rtt.as_micros() as i64, i64), + ); } true } diff --git a/gossip/src/protocol.rs b/gossip/src/protocol.rs index 7a789421f15cb4..b09771e2b0ec5c 100644 --- a/gossip/src/protocol.rs +++ b/gossip/src/protocol.rs @@ -230,25 +230,27 @@ pub(crate) fn split_gossip_messages( let mut data_feed = data_feed.into_iter().fuse(); let mut buffer = vec![]; let mut buffer_size = 0; // Serialized size of buffered values. - std::iter::from_fn(move || loop { - let Some(data) = data_feed.next() else { - return (!buffer.is_empty()).then(|| std::mem::take(&mut buffer)); - }; - let data_size = match bincode::serialized_size(&data) { - Ok(size) => size as usize, - Err(err) => { - error!("serialized_size failed: {err:?}"); - continue; + std::iter::from_fn(move || { + loop { + let Some(data) = data_feed.next() else { + return (!buffer.is_empty()).then(|| std::mem::take(&mut buffer)); + }; + let data_size = match bincode::serialized_size(&data) { + Ok(size) => size as usize, + Err(err) => { + error!("serialized_size failed: {err:?}"); + continue; + } + }; + if buffer_size + data_size <= max_chunk_size { + buffer_size += data_size; + buffer.push(data); + } else if data_size <= max_chunk_size { + buffer_size = data_size; + return Some(std::mem::replace(&mut buffer, vec![data])); + } else { + error!("dropping data larger than the maximum chunk size {data:?}",); } - }; - if buffer_size + data_size <= max_chunk_size { - buffer_size += data_size; - buffer.push(data); - } else if data_size <= max_chunk_size { - buffer_size = data_size; - return Some(std::mem::replace(&mut buffer, vec![data])); - } else { - error!("dropping data larger than the maximum chunk size {data:?}",); } }) } @@ -262,7 +264,7 @@ pub(crate) mod tests { crds_data::{ self, AccountsHashes, CrdsData, LowestSlot, SnapshotHashes, Vote as CrdsVote, }, - duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS}, + duplicate_shred::{self, MAX_DUPLICATE_SHREDS, tests::new_rand_shred}, }, rand::Rng, solana_clock::Slot, diff --git a/gossip/src/push_active_set.rs b/gossip/src/push_active_set.rs index bce579628e5571..89692ec22f5bd1 100644 --- a/gossip/src/push_active_set.rs +++ b/gossip/src/push_active_set.rs @@ -226,39 +226,55 @@ mod tests { } let other = &nodes[5]; let origin = &nodes[17]; - assert!(active_set - .get_nodes(&pubkey, origin, &stakes) - .eq([13, 5, 18, 16, 0].into_iter().map(|k| &nodes[k]))); - assert!(active_set - .get_nodes(&pubkey, other, &stakes) - .eq([13, 18, 16, 0].into_iter().map(|k| &nodes[k]))); + assert!( + active_set + .get_nodes(&pubkey, origin, &stakes) + .eq([13, 5, 18, 16, 0].into_iter().map(|k| &nodes[k])) + ); + assert!( + active_set + .get_nodes(&pubkey, other, &stakes) + .eq([13, 18, 16, 0].into_iter().map(|k| &nodes[k])) + ); active_set.prune(&pubkey, &nodes[5], &[*origin], &stakes); active_set.prune(&pubkey, &nodes[3], &[*origin], &stakes); active_set.prune(&pubkey, &nodes[16], &[*origin], &stakes); - assert!(active_set - .get_nodes(&pubkey, origin, &stakes) - .eq([13, 18, 0].into_iter().map(|k| &nodes[k]))); - assert!(active_set - .get_nodes(&pubkey, other, &stakes) - .eq([13, 18, 16, 0].into_iter().map(|k| &nodes[k]))); + assert!( + active_set + .get_nodes(&pubkey, origin, &stakes) + .eq([13, 18, 0].into_iter().map(|k| &nodes[k])) + ); + assert!( + active_set + .get_nodes(&pubkey, other, &stakes) + .eq([13, 18, 16, 0].into_iter().map(|k| &nodes[k])) + ); active_set.rotate(&mut rng, 7, CLUSTER_SIZE, &nodes, &stakes); assert!(active_set.0.iter().all(|entry| entry.0.len() == 7)); - assert!(active_set - .get_nodes(&pubkey, origin, &stakes) - .eq([18, 0, 7, 15, 11].into_iter().map(|k| &nodes[k]))); - assert!(active_set - .get_nodes(&pubkey, other, &stakes) - .eq([18, 16, 0, 7, 15, 11].into_iter().map(|k| &nodes[k]))); + assert!( + active_set + .get_nodes(&pubkey, origin, &stakes) + .eq([18, 0, 7, 15, 11].into_iter().map(|k| &nodes[k])) + ); + assert!( + active_set + .get_nodes(&pubkey, other, &stakes) + .eq([18, 16, 0, 7, 15, 11].into_iter().map(|k| &nodes[k])) + ); let origins = [*origin, *other]; active_set.prune(&pubkey, &nodes[18], &origins, &stakes); active_set.prune(&pubkey, &nodes[0], &origins, &stakes); active_set.prune(&pubkey, &nodes[15], &origins, &stakes); - assert!(active_set - .get_nodes(&pubkey, origin, &stakes) - .eq([7, 11].into_iter().map(|k| &nodes[k]))); - assert!(active_set - .get_nodes(&pubkey, other, &stakes) - .eq([16, 7, 11].into_iter().map(|k| &nodes[k]))); + assert!( + active_set + .get_nodes(&pubkey, origin, &stakes) + .eq([7, 11].into_iter().map(|k| &nodes[k])) + ); + assert!( + active_set + .get_nodes(&pubkey, other, &stakes) + .eq([16, 7, 11].into_iter().map(|k| &nodes[k])) + ); } #[test] @@ -284,9 +300,11 @@ mod tests { if !keys.contains(&origin) { assert!(entry.get_nodes(pubkey, origin).eq(keys)); } else { - assert!(entry - .get_nodes(pubkey, origin) - .eq(keys.into_iter().filter(|&key| key != origin))); + assert!( + entry + .get_nodes(pubkey, origin) + .eq(keys.into_iter().filter(|&key| key != origin)) + ); } } // Assert that each filter already prunes the key. @@ -294,9 +312,11 @@ mod tests { assert!(filter.contains(node)); } for (pubkey, origin) in iproduct!(&nodes, keys) { - assert!(entry - .get_nodes(pubkey, origin) - .eq(keys.into_iter().filter(|&node| node != origin))); + assert!( + entry + .get_nodes(pubkey, origin) + .eq(keys.into_iter().filter(|&node| node != origin)) + ); } // Assert that prune excludes node from get. let origin = &nodes[3]; @@ -304,9 +324,11 @@ mod tests { entry.prune(&nodes[14], origin); entry.prune(&nodes[19], origin); for pubkey in &nodes { - assert!(entry.get_nodes(pubkey, origin).eq(keys - .into_iter() - .filter(|&&node| pubkey == origin || (node != nodes[11] && node != nodes[14])))); + assert!( + entry.get_nodes(pubkey, origin).eq(keys + .into_iter() + .filter(|&&node| pubkey == origin || (node != nodes[11] && node != nodes[14]))) + ); } // Assert that rotate adds new nodes. entry.rotate(&mut rng, 5, NUM_BLOOM_FILTER_ITEMS, &nodes, &weights); diff --git a/gossip/src/weighted_shuffle.rs b/gossip/src/weighted_shuffle.rs index 0ce74648688db7..ece56349361d79 100644 --- a/gossip/src/weighted_shuffle.rs +++ b/gossip/src/weighted_shuffle.rs @@ -266,16 +266,20 @@ mod tests { })); assert!(mask.iter().all(|&x| x)); // Assert that the random shuffle is weighted. - assert!(shuffle - .chunks(shuffle.len() / 10) - .map(|chunk| chunk.iter().map(|&i| weights[i]).sum::()) - .tuple_windows() - .all(|(a, b)| a > b)); + assert!( + shuffle + .chunks(shuffle.len() / 10) + .map(|chunk| chunk.iter().map(|&i| weights[i]).sum::()) + .tuple_windows() + .all(|(a, b)| a > b) + ); // Assert that zero weights only appear at the end of the shuffle. - assert!(shuffle - .iter() - .tuple_windows() - .all(|(&i, &j)| weights[i] != 0 || weights[j] == 0)); + assert!( + shuffle + .iter() + .tuple_windows() + .all(|(&i, &j)| weights[i] != 0 || weights[j] == 0) + ); } fn weighted_shuffle_slow(rng: &mut R, mut weights: Vec) -> Vec @@ -445,7 +449,9 @@ mod tests { let mut shuffle = WeightedShuffle::new("", weights); assert_eq!( shuffle.clone().shuffle(&mut rng).collect::>(), - [10, 3, 14, 18, 0, 9, 19, 6, 2, 1, 17, 7, 13, 15, 20, 12, 4, 8, 5, 16, 11] + [ + 10, 3, 14, 18, 0, 9, 19, 6, 2, 1, 17, 7, 13, 15, 20, 12, 4, 8, 5, 16, 11 + ] ); let mut rng = ChaChaRng::from_seed(seed); assert_eq!(shuffle.first(&mut rng), Some(10)); @@ -465,7 +471,9 @@ mod tests { let mut shuffle = WeightedShuffle::new("", weights); assert_eq!( shuffle.clone().shuffle(&mut rng).collect::>(), - [3, 15, 10, 6, 19, 17, 2, 0, 9, 20, 1, 14, 7, 8, 12, 18, 4, 13, 5, 11, 16] + [ + 3, 15, 10, 6, 19, 17, 2, 0, 9, 20, 1, 14, 7, 8, 12, 18, 4, 13, 5, 11, 16 + ] ); let mut rng = ChaChaRng::from_seed(seed); assert_eq!(shuffle.first(&mut rng), Some(3)); diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 570c6768d1d2e2..ff970bf1f529b9 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -3,7 +3,7 @@ use { bincode::serialized_size, itertools::Itertools, log::*, - rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, + rayon::{ThreadPool, ThreadPoolBuilder, prelude::*}, serial_test::serial, solana_gossip::{ cluster_info_metrics::GossipStats, @@ -13,7 +13,7 @@ use { crds_gossip::*, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{ - CrdsTimeouts, ProcessPullStats, PullRequest, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, CrdsTimeouts, ProcessPullStats, PullRequest, }, crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, crds_value::{CrdsValue, CrdsValueLabel}, diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 3522188e824012..0d80511fbb8e5c 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -18,15 +18,15 @@ use { solana_pubkey::Pubkey, solana_runtime::bank_forks::BankForks, solana_signer::Signer, - solana_streamer::sendmmsg::{multi_target_send, SendPktsError}, + solana_streamer::sendmmsg::{SendPktsError, multi_target_send}, solana_time_utils::timestamp, solana_transaction::Transaction, solana_vote_program::{vote_instruction, vote_state::Vote}, std::{ net::UdpSocket, sync::{ - atomic::{AtomicBool, Ordering}, Arc, RwLock, + atomic::{AtomicBool, Ordering}, }, thread::sleep, time::Duration, @@ -290,7 +290,7 @@ pub fn cluster_info_scale() { solana_perf::test_tx::test_tx, solana_runtime::{ bank::Bank, - genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs}, + genesis_utils::{ValidatorVoteKeypairs, create_genesis_config_with_vote_accounts}, }, }; agave_logger::setup(); diff --git a/net-utils/Cargo.toml b/net-utils/Cargo.toml index 18159296c96b06..0fb2e0f273c1ef 100644 --- a/net-utils/Cargo.toml +++ b/net-utils/Cargo.toml @@ -7,7 +7,7 @@ authors = { workspace = true } repository = { workspace = true } homepage = { workspace = true } license = { workspace = true } -edition = { workspace = true } +edition = "2024" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/net-utils/benches/token_bucket.rs b/net-utils/benches/token_bucket.rs index 803a373e6eb15f..36d47e712589ab 100644 --- a/net-utils/benches/token_bucket.rs +++ b/net-utils/benches/token_bucket.rs @@ -24,34 +24,38 @@ fn bench_token_bucket() { std::thread::scope(|scope| { for _ in 0..workers { - scope.spawn(|| loop { - if start.elapsed() > run_duration { - break; + scope.spawn(|| { + loop { + if start.elapsed() > run_duration { + break; + } + match tb.consume_tokens(request_size) { + Ok(_) => accepted.fetch_add(1, Ordering::Relaxed), + Err(_) => rejected.fetch_add(1, Ordering::Relaxed), + }; } - match tb.consume_tokens(request_size) { - Ok(_) => accepted.fetch_add(1, Ordering::Relaxed), - Err(_) => rejected.fetch_add(1, Ordering::Relaxed), - }; }); } // periodically check for races - let jh = scope.spawn(|| loop { - std::thread::sleep(Duration::from_millis(100)); - let elapsed = start.elapsed(); - if elapsed > run_duration { - break; + let jh = scope.spawn(|| { + loop { + std::thread::sleep(Duration::from_millis(100)); + let elapsed = start.elapsed(); + if elapsed > run_duration { + break; + } + let acc = accepted.load(Ordering::Relaxed); + let rate = acc as f64 / elapsed.as_secs_f64(); + assert!( + tb.current_tokens() < request_size * 2, + "bucket should have no spare tokens" + ); + assert!( + // allow 1% error + (rate - target_rate).abs() < target_rate / 100.0, + "Accepted rate should be about {target_rate}, actual {rate}" + ); } - let acc = accepted.load(Ordering::Relaxed); - let rate = acc as f64 / elapsed.as_secs_f64(); - assert!( - tb.current_tokens() < request_size * 2, - "bucket should have no spare tokens" - ); - assert!( - // allow 1% error - (rate - target_rate).abs() < target_rate / 100.0, - "Accepted rate should be about {target_rate}, actual {rate}" - ); }); jh.join().expect("Rate checks should pass"); }); diff --git a/net-utils/src/ip_echo_client.rs b/net-utils/src/ip_echo_client.rs index c1ed42ebc5e50a..7045720a50ce0a 100644 --- a/net-utils/src/ip_echo_client.rs +++ b/net-utils/src/ip_echo_client.rs @@ -1,7 +1,7 @@ use { crate::{ - ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse}, HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH, MAX_PORT_COUNT_PER_MESSAGE, + ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse}, }, anyhow::bail, bytes::{BufMut, BytesMut}, diff --git a/net-utils/src/ip_echo_server.rs b/net-utils/src/ip_echo_server.rs index 0a32d2b8d215db..77f427985a8241 100644 --- a/net-utils/src/ip_echo_server.rs +++ b/net-utils/src/ip_echo_server.rs @@ -1,5 +1,5 @@ use { - crate::{bind_to_unspecified, HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH}, + crate::{HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH, bind_to_unspecified}, log::*, serde::{Deserialize, Serialize}, solana_serde::default_on_eof, diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 4c0ebf5bb24f58..466e46944b7cde 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -30,7 +30,7 @@ pub mod tooling_for_tests; use { ip_echo_client::{ip_echo_server_request, ip_echo_server_request_with_binding}, ip_echo_server::IpEchoServerMessage, - rand::{rng, Rng}, + rand::{Rng, rng}, std::{ io::{self}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, ToSocketAddrs, UdpSocket}, @@ -39,8 +39,8 @@ use { }; pub use { ip_echo_server::{ - ip_echo_server, IpEchoServer, DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_COUNT_PER_MESSAGE, - MINIMUM_IP_ECHO_SERVER_THREADS, + DEFAULT_IP_ECHO_SERVER_THREADS, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE, + MINIMUM_IP_ECHO_SERVER_THREADS, ip_echo_server, }, socket_addr_space::SocketAddrSpace, }; diff --git a/net-utils/src/multihomed_sockets.rs b/net-utils/src/multihomed_sockets.rs index 3162e65685d76f..8299b58c992f91 100644 --- a/net-utils/src/multihomed_sockets.rs +++ b/net-utils/src/multihomed_sockets.rs @@ -3,8 +3,8 @@ use std::{ net::{IpAddr, Ipv4Addr, UdpSocket}, ops::Deref, sync::{ - atomic::{AtomicUsize, Ordering}, Arc, + atomic::{AtomicUsize, Ordering}, }, }; diff --git a/net-utils/src/sockets.rs b/net-utils/src/sockets.rs index 7c538e87b489c4..bdf662b292ae8b 100644 --- a/net-utils/src/sockets.rs +++ b/net-utils/src/sockets.rs @@ -340,11 +340,11 @@ mod tests { use { super::*, crate::{ - bind_in_range, get_cluster_shred_version, get_public_ip_addr_with_binding, - ip_echo_client, ip_echo_server, parse_host, + DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_VERIFY_THREADS, bind_in_range, + get_cluster_shred_version, get_public_ip_addr_with_binding, ip_echo_client, + ip_echo_server, parse_host, sockets::{localhost_port_range_for_tests, unique_port_range_for_tests}, - verify_all_reachable_tcp, verify_all_reachable_udp, DEFAULT_IP_ECHO_SERVER_THREADS, - MAX_PORT_VERIFY_THREADS, + verify_all_reachable_tcp, verify_all_reachable_udp, }, itertools::Itertools, std::{net::Ipv4Addr, time::Duration}, @@ -448,14 +448,16 @@ mod tests { ) { assert!(port2 == port1 + offset); } - assert!(bind_two_in_range_with_offset_and_config( - ip_addr, - (port_range.start, port_range.start + 5), - offset, - config, - config - ) - .is_err()); + assert!( + bind_two_in_range_with_offset_and_config( + ip_addr, + (port_range.start, port_range.start + 5), + offset, + config, + config + ) + .is_err() + ); } #[test] diff --git a/net-utils/src/token_bucket.rs b/net-utils/src/token_bucket.rs index a5c3f8ccc4b2db..04dcf8e3c661ae 100644 --- a/net-utils/src/token_bucket.rs +++ b/net-utils/src/token_bucket.rs @@ -4,7 +4,7 @@ //! as connections. use { cfg_if::cfg_if, - dashmap::{mapref::entry::Entry, DashMap}, + dashmap::{DashMap, mapref::entry::Entry}, solana_svm_type_overrides::sync::atomic::{AtomicU64, AtomicUsize, Ordering}, std::{borrow::Borrow, cmp::Reverse, hash::Hash, time::Instant}, }; diff --git a/tls-utils/Cargo.toml b/tls-utils/Cargo.toml index 6afd2b351f3a12..559c5f9295cd75 100644 --- a/tls-utils/Cargo.toml +++ b/tls-utils/Cargo.toml @@ -7,7 +7,7 @@ authors = { workspace = true } repository = { workspace = true } homepage = { workspace = true } license = { workspace = true } -edition = { workspace = true } +edition = "2024" [features] agave-unstable-api = [] diff --git a/tls-utils/src/config.rs b/tls-utils/src/config.rs index c0f038821547fd..2cc9d29eef3bd0 100644 --- a/tls-utils/src/config.rs +++ b/tls-utils/src/config.rs @@ -1,6 +1,6 @@ use { rustls::{ - client::WantsClientCert, server::WantsServerCert, ClientConfig, ConfigBuilder, ServerConfig, + ClientConfig, ConfigBuilder, ServerConfig, client::WantsClientCert, server::WantsServerCert, }, std::sync::Arc, }; diff --git a/tls-utils/src/crypto_provider.rs b/tls-utils/src/crypto_provider.rs index 1e1d754fda4de8..32dbd57b628392 100644 --- a/tls-utils/src/crypto_provider.rs +++ b/tls-utils/src/crypto_provider.rs @@ -1,4 +1,4 @@ -use rustls::{crypto::CryptoProvider, NamedGroup}; +use rustls::{NamedGroup, crypto::CryptoProvider}; pub fn crypto_provider() -> CryptoProvider { let mut provider = rustls::crypto::ring::default_provider(); diff --git a/tls-utils/src/skip_client_verification.rs b/tls-utils/src/skip_client_verification.rs index 5ecc9bb866c990..b7f8a1b53fdc91 100644 --- a/tls-utils/src/skip_client_verification.rs +++ b/tls-utils/src/skip_client_verification.rs @@ -1,11 +1,11 @@ use { crate::crypto_provider, rustls::{ + DigitallySignedStruct, DistinguishedName, Error, SignatureScheme, client::danger::HandshakeSignatureValid, crypto::CryptoProvider, pki_types::{CertificateDer, UnixTime}, server::danger::{ClientCertVerified, ClientCertVerifier}, - DigitallySignedStruct, DistinguishedName, Error, SignatureScheme, }, std::{fmt::Debug, sync::Arc}, }; diff --git a/tls-utils/src/skip_server_verification.rs b/tls-utils/src/skip_server_verification.rs index 4fdef2c389679a..5e1dab5b5c2d14 100644 --- a/tls-utils/src/skip_server_verification.rs +++ b/tls-utils/src/skip_server_verification.rs @@ -1,10 +1,10 @@ use { crate::crypto_provider, rustls::{ + DigitallySignedStruct, Error, SignatureScheme, client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}, - crypto::{verify_tls12_signature, verify_tls13_signature, CryptoProvider}, + crypto::{CryptoProvider, verify_tls12_signature, verify_tls13_signature}, pki_types::{CertificateDer, ServerName, UnixTime}, - DigitallySignedStruct, Error, SignatureScheme, }, std::{ fmt::{self, Debug, Formatter}, diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index 5b9c8ee87c4a12..2a84d45c4770df 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -7,7 +7,7 @@ description = { workspace = true } repository = { workspace = true } homepage = { workspace = true } license = { workspace = true } -edition = { workspace = true } +edition = "2024" [features] agave-unstable-api = [] diff --git a/turbine/benches/cluster_info.rs b/turbine/benches/cluster_info.rs index ed12982bbbad3c..b6714dd581014c 100644 --- a/turbine/benches/cluster_info.rs +++ b/turbine/benches/cluster_info.rs @@ -1,23 +1,23 @@ use { - bencher::{benchmark_group, benchmark_main, Bencher}, - rand::{rng, Rng}, + bencher::{Bencher, benchmark_group, benchmark_main}, + rand::{Rng, rng}, solana_entry::entry::Entry, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo, node::Node}, solana_hash::Hash, solana_keypair::Keypair, solana_ledger::{ - genesis_utils::{create_genesis_config, GenesisConfigInfo}, + genesis_utils::{GenesisConfigInfo, create_genesis_config}, shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, }, - solana_net_utils::{sockets::bind_to_localhost_unique, SocketAddrSpace}, + solana_net_utils::{SocketAddrSpace, sockets::bind_to_localhost_unique}, solana_pubkey as pubkey, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_signer::Signer, - solana_time_utils::{timestamp, AtomicInterval}, + solana_time_utils::{AtomicInterval, timestamp}, solana_turbine::{ broadcast_stage::{ - broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastSocket, - BroadcastStage, + BroadcastSocket, BroadcastStage, broadcast_metrics::TransmitShredsStats, + broadcast_shreds, }, cluster_nodes::ClusterNodesCache, }, diff --git a/turbine/benches/cluster_nodes.rs b/turbine/benches/cluster_nodes.rs index 9ed4afe582ec77..00b6c09cd7903b 100644 --- a/turbine/benches/cluster_nodes.rs +++ b/turbine/benches/cluster_nodes.rs @@ -1,6 +1,6 @@ use { - bencher::{benchmark_group, benchmark_main, Bencher}, - rand::{prelude::IndexedRandom as _, Rng}, + bencher::{Bencher, benchmark_group, benchmark_main}, + rand::{Rng, prelude::IndexedRandom as _}, solana_clock::Slot, solana_cluster_type::ClusterType, solana_gossip::contact_info::ContactInfo, @@ -10,7 +10,7 @@ use { solana_net_utils::SocketAddrSpace, solana_pubkey::Pubkey, solana_turbine::{ - cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes}, + cluster_nodes::{ClusterNodes, make_test_cluster, new_cluster_nodes}, retransmit_stage::RetransmitStage, }, }; diff --git a/turbine/src/addr_cache.rs b/turbine/src/addr_cache.rs index b1033f5aae3f0d..52ffdf92d942fc 100644 --- a/turbine/src/addr_cache.rs +++ b/turbine/src/addr_cache.rs @@ -3,11 +3,11 @@ use { itertools::Itertools, solana_clock::Slot, solana_ledger::shred::{ - ShredId, ShredType, MAX_CODE_SHREDS_PER_SLOT, MAX_DATA_SHREDS_PER_SLOT, + MAX_CODE_SHREDS_PER_SLOT, MAX_DATA_SHREDS_PER_SLOT, ShredId, ShredType, }, std::{ cmp::Reverse, - collections::{hash_map::Entry, HashMap, VecDeque}, + collections::{HashMap, VecDeque, hash_map::Entry}, net::SocketAddr, }, }; @@ -397,9 +397,11 @@ mod tests { assert_eq!(entry.index_data, 2); entry.last_shred_in_slot = true; - assert!(entry - .get_shreds(7) - .eq([(ShredType::Code, 3), (ShredType::Data, 2)])); + assert!( + entry + .get_shreds(7) + .eq([(ShredType::Code, 3), (ShredType::Data, 2)]) + ); assert_eq!(entry.index_code, 3); assert_eq!(entry.index_data, 2); diff --git a/turbine/src/broadcast_stage.rs b/turbine/src/broadcast_stage.rs index ca72ccd041294d..4c7f1b913610ef 100644 --- a/turbine/src/broadcast_stage.rs +++ b/turbine/src/broadcast_stage.rs @@ -12,7 +12,7 @@ use { cluster_nodes::{ClusterNodes, ClusterNodesCache}, xdp::XdpSender, }, - crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender}, + crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender, unbounded}, itertools::Itertools, solana_clock::Slot, solana_gossip::{ @@ -27,14 +27,14 @@ use { solana_poh::poh_recorder::WorkingBankEntry, solana_pubkey::Pubkey, solana_runtime::{bank::MAX_LEADER_SCHEDULE_STAKES, bank_forks::BankForks}, - solana_streamer::sendmmsg::{batch_send, SendPktsError}, - solana_time_utils::{timestamp, AtomicInterval}, + solana_streamer::sendmmsg::{SendPktsError, batch_send}, + solana_time_utils::{AtomicInterval, timestamp}, std::{ collections::{HashMap, HashSet}, net::UdpSocket, sync::{ - atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock, + atomic::{AtomicBool, Ordering}, }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -389,16 +389,18 @@ impl BroadcastStage { let retransmit_thread = Builder::new() .name("solBroadcastRtx".to_string()) - .spawn(move || loop { - if let Some(res) = Self::handle_error( - Self::check_retransmit_signals( - &blockstore, - &retransmit_slots_receiver, - &socket_sender, - ), - "solana-broadcaster-retransmit-check_retransmit_signals", - ) { - return res; + .spawn(move || { + loop { + if let Some(res) = Self::handle_error( + Self::check_retransmit_signals( + &blockstore, + &retransmit_slots_receiver, + &socket_sender, + ), + "solana-broadcaster-retransmit-check_retransmit_signals", + ) { + return res; + } } }) .unwrap(); @@ -424,9 +426,11 @@ impl BroadcastStage { .get_data_shreds_for_slot(new_retransmit_slot, 0) .expect("My own shreds must be reconstructable"), ); - debug_assert!(data_shreds - .iter() - .all(|shred| shred.slot() == new_retransmit_slot)); + debug_assert!( + data_shreds + .iter() + .all(|shred| shred.slot() == new_retransmit_slot) + ); if !data_shreds.is_empty() { socket_sender.send((data_shreds, None))?; } @@ -437,9 +441,11 @@ impl BroadcastStage { .expect("My own shreds must be reconstructable"), ); - debug_assert!(coding_shreds - .iter() - .all(|shred| shred.slot() == new_retransmit_slot)); + debug_assert!( + coding_shreds + .iter() + .all(|shred| shred.slot() == new_retransmit_slot) + ); if !coding_shreds.is_empty() { socket_sender.send((coding_shreds, None))?; } @@ -563,15 +569,15 @@ pub mod test { solana_keypair::Keypair, solana_ledger::{ blockstore::Blockstore, - genesis_utils::{create_genesis_config, GenesisConfigInfo}, + genesis_utils::{GenesisConfigInfo, create_genesis_config}, get_tmp_ledger_path_auto_delete, - shred::{max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, Shredder}, + shred::{ProcessShredsStats, ReedSolomonCache, Shredder, max_ticks_per_n_shreds}, }, solana_runtime::bank::Bank, solana_signer::Signer, std::{ path::Path, - sync::{atomic::AtomicBool, Arc}, + sync::{Arc, atomic::AtomicBool}, thread::sleep, }, }; diff --git a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs index 140d959b21809f..d5ded506263fc4 100644 --- a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs @@ -275,12 +275,16 @@ impl BroadcastRun for BroadcastDuplicatesRun { // Store the original shreds that this node replayed blockstore_sender.send((original_last_data_shred.clone(), None))?; - assert!(original_last_data_shred - .iter() - .all(|shred| shred.slot() == bank.slot())); - assert!(partition_last_data_shred - .iter() - .all(|shred| shred.slot() == bank.slot())); + assert!( + original_last_data_shred + .iter() + .all(|shred| shred.slot() == bank.slot()) + ); + assert!( + partition_last_data_shred + .iter() + .all(|shred| shred.slot() == bank.slot()) + ); if let Some(duplicate_slot_sender) = &self.config.duplicate_slot_sender { let _ = duplicate_slot_sender.send(bank.slot()); diff --git a/turbine/src/broadcast_stage/broadcast_metrics.rs b/turbine/src/broadcast_stage/broadcast_metrics.rs index 85513f0053fa41..57926b400e80b1 100644 --- a/turbine/src/broadcast_stage/broadcast_metrics.rs +++ b/turbine/src/broadcast_stage/broadcast_metrics.rs @@ -166,15 +166,15 @@ impl SlotBroadcastStats { if let Some(num_expected_batches) = batch_info.num_expected_batches { slot_batch_counter.num_expected_batches = Some(num_expected_batches); } - if let Some(num_expected_batches) = slot_batch_counter.num_expected_batches { - if slot_batch_counter.num_batches == num_expected_batches { - slot_batch_counter.broadcast_shred_stats.report_stats( - batch_info.slot, - batch_info.slot_start_ts, - batch_info.was_interrupted, - ); - should_delete = true; - } + if let Some(num_expected_batches) = slot_batch_counter.num_expected_batches + && slot_batch_counter.num_batches == num_expected_batches + { + slot_batch_counter.broadcast_shred_stats.report_stats( + batch_info.slot, + batch_info.slot_start_ts, + batch_info.was_interrupted, + ); + should_delete = true; } } if should_delete { diff --git a/turbine/src/broadcast_stage/broadcast_utils.rs b/turbine/src/broadcast_stage/broadcast_utils.rs index 2b556f372c4cd7..fe242f96ef8f19 100644 --- a/turbine/src/broadcast_stage/broadcast_utils.rs +++ b/turbine/src/broadcast_stage/broadcast_utils.rs @@ -6,7 +6,7 @@ use { solana_hash::Hash, solana_ledger::{ blockstore::Blockstore, - shred::{self, get_data_shred_bytes_per_batch_typical, ProcessShredsStats}, + shred::{self, ProcessShredsStats, get_data_shred_bytes_per_batch_typical}, }, solana_poh::poh_recorder::WorkingBankEntry, solana_runtime::bank::Bank, @@ -198,7 +198,7 @@ mod tests { super::*, crossbeam_channel::unbounded, solana_genesis_config::GenesisConfig, - solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_ledger::genesis_utils::{GenesisConfigInfo, create_genesis_config}, solana_pubkey::Pubkey, solana_system_transaction as system_transaction, solana_transaction::Transaction, diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index b8d953db962777..4f14a61379da76 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -10,8 +10,8 @@ use { solana_hash::Hash, solana_keypair::Keypair, solana_ledger::shred::{ - ProcessShredsStats, ReedSolomonCache, Shred, ShredType, Shredder, MAX_CODE_SHREDS_PER_SLOT, - MAX_DATA_SHREDS_PER_SLOT, + MAX_CODE_SHREDS_PER_SLOT, MAX_DATA_SHREDS_PER_SLOT, ProcessShredsStats, ReedSolomonCache, + Shred, ShredType, Shredder, }, solana_time_utils::AtomicInterval, std::{borrow::Cow, sync::RwLock}, @@ -278,16 +278,16 @@ impl StandardBroadcastRun { // https://github.com/solana-labs/solana/blob/92a0b310c/turbine/src/broadcast_stage/standard_broadcast_run.rs#L132-L142 // By contrast Self::insert skips the 1st data shred with index zero: // https://github.com/solana-labs/solana/blob/92a0b310c/turbine/src/broadcast_stage/standard_broadcast_run.rs#L367-L373 - if let Some(shred) = shreds.iter().find(|shred| shred.is_data()) { - if shred.index() == 0 { - blockstore - .insert_cow_shreds( - [Cow::Borrowed(shred)], - None, // leader_schedule - true, // is_trusted - ) - .expect("Failed to insert shreds in blockstore"); - } + if let Some(shred) = shreds.iter().find(|shred| shred.is_data()) + && shred.index() == 0 + { + blockstore + .insert_cow_shreds( + [Cow::Borrowed(shred)], + None, // leader_schedule + true, // is_trusted + ) + .expect("Failed to insert shreds in blockstore"); } to_shreds_time.stop(); @@ -483,9 +483,9 @@ mod test { blockstore::Blockstore, genesis_utils::create_genesis_config, get_tmp_ledger_path, - shred::{max_ticks_per_n_shreds, DATA_SHREDS_PER_FEC_BLOCK}, + shred::{DATA_SHREDS_PER_FEC_BLOCK, max_ticks_per_n_shreds}, }, - solana_net_utils::{sockets::bind_to_localhost_unique, SocketAddrSpace}, + solana_net_utils::{SocketAddrSpace, sockets::bind_to_localhost_unique}, solana_runtime::bank::Bank, solana_signer::Signer, std::{ops::Deref, sync::Arc, time::Duration}, @@ -678,18 +678,22 @@ mod test { ); // Broadcast stats for interrupted slot should be cleared - assert!(standard_broadcast_run - .transmit_shreds_stats - .lock() - .unwrap() - .get(interrupted_slot) - .is_none()); - assert!(standard_broadcast_run - .insert_shreds_stats - .lock() - .unwrap() - .get(interrupted_slot) - .is_none()); + assert!( + standard_broadcast_run + .transmit_shreds_stats + .lock() + .unwrap() + .get(interrupted_slot) + .is_none() + ); + assert!( + standard_broadcast_run + .insert_shreds_stats + .lock() + .unwrap() + .get(interrupted_slot) + .is_none() + ); // Try to fetch the incomplete ticks from blockstore, should succeed assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0); diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index daffefd1d28866..b34895eb076976 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -3,7 +3,7 @@ use { agave_feature_set::{self as feature_set}, itertools::Either, lazy_lru::LruCache, - rand::{seq::SliceRandom, Rng, RngCore, SeedableRng}, + rand::{Rng, RngCore, SeedableRng, seq::SliceRandom}, rand_chacha::{ChaCha8Rng, ChaChaRng}, solana_clock::{Epoch, Slot}, solana_cluster_type::ClusterType, diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index a49877b0f46db7..a9b2d0f64f6720 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -4,7 +4,7 @@ use { crate::{ addr_cache::AddrCache, cluster_nodes::{ - ClusterNodes, ClusterNodesCache, Error, DATA_PLANE_FANOUT, MAX_NUM_TURBINE_HOPS, + ClusterNodes, ClusterNodesCache, DATA_PLANE_FANOUT, Error, MAX_NUM_TURBINE_HOPS, }, xdp::XdpSender, }, @@ -12,7 +12,7 @@ use { crossbeam_channel::{Receiver, RecvError, Sender, TryRecvError}, lru::LruCache, rand::Rng, - rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, + rayon::{ThreadPool, ThreadPoolBuilder, prelude::*}, solana_clock::Slot, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ @@ -32,7 +32,7 @@ use { bank::{Bank, MAX_LEADER_SCHEDULE_STAKES}, bank_forks::BankForks, }, - solana_streamer::sendmmsg::{multi_target_send, SendPktsError}, + solana_streamer::sendmmsg::{SendPktsError, multi_target_send}, solana_time_utils::timestamp, std::{ borrow::Cow, @@ -40,8 +40,8 @@ use { net::{SocketAddr, UdpSocket}, ops::AddAssign, sync::{ - atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, + atomic::{AtomicU64, AtomicUsize, Ordering}, }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -481,14 +481,14 @@ fn retransmit_shred( let num_nodes = match socket { RetransmitSocket::Xdp(sender) => { let mut sent = num_addrs; - if num_addrs > 0 { - if let Err(e) = sender.try_send(key.index() as usize, addrs.to_vec(), shred) { - log::warn!("xdp channel full: {e:?}"); - stats - .num_shreds_dropped_xdp_full - .fetch_add(num_addrs, Ordering::Relaxed); - sent = 0; - } + if num_addrs > 0 + && let Err(e) = sender.try_send(key.index() as usize, addrs.to_vec(), shred) + { + log::warn!("xdp channel full: {e:?}"); + stats + .num_shreds_dropped_xdp_full + .fetch_add(num_addrs, Ordering::Relaxed); + sent = 0; } sent } @@ -883,13 +883,13 @@ fn notify_subscribers( .unwrap() .notify_first_shred_received(slot); } - if let Some(votor_event_sender) = votor_event_sender { - if let Err(err) = votor_event_sender.send(VotorEvent::FirstShred(slot)) { - warn!( - "Sending {:?} failed as channel became disconnected. Ignoring.", - err.into_inner() - ); - } + if let Some(votor_event_sender) = votor_event_sender + && let Err(err) = votor_event_sender.send(VotorEvent::FirstShred(slot)) + { + warn!( + "Sending {:?} failed as channel became disconnected. Ignoring.", + err.into_inner() + ); } } diff --git a/turbine/src/sigverify_shreds.rs b/turbine/src/sigverify_shreds.rs index 2e731a361e8ece..f6186f04ac074d 100644 --- a/turbine/src/sigverify_shreds.rs +++ b/turbine/src/sigverify_shreds.rs @@ -1,12 +1,12 @@ use { crate::{ - cluster_nodes::{check_feature_activation, ClusterNodesCache, DATA_PLANE_FANOUT}, + cluster_nodes::{ClusterNodesCache, DATA_PLANE_FANOUT, check_feature_activation}, retransmit_stage::RetransmitStage, }, agave_feature_set as feature_set, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, itertools::{Either, Itertools}, - rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, + rayon::{ThreadPool, ThreadPoolBuilder, prelude::*}, solana_clock::Slot, solana_gossip::cluster_info::ClusterInfo, solana_keypair::Keypair, @@ -17,7 +17,7 @@ use { layout::{get_shred, resign_packet}, wire::is_retransmitter_signed_variant, }, - sigverify_shreds::{verify_shreds, LruCache, SlotPubkeys}, + sigverify_shreds::{LruCache, SlotPubkeys, verify_shreds}, }, solana_perf::{ self, @@ -31,8 +31,8 @@ use { std::{ num::NonZeroUsize, sync::{ - atomic::{AtomicUsize, Ordering}, Arc, RwLock, + atomic::{AtomicUsize, Ordering}, }, thread::{Builder, JoinHandle}, time::{Duration, Instant}, @@ -555,7 +555,7 @@ mod tests { use { super::*, rand::Rng, - solana_entry::entry::{create_ticks, Entry}, + solana_entry::entry::{Entry, create_ticks}, solana_gossip::contact_info::ContactInfo, solana_hash::Hash, solana_keypair::Keypair, diff --git a/turbine/src/xdp.rs b/turbine/src/xdp.rs index a5468d899e08f6..9e715829f7d43c 100644 --- a/turbine/src/xdp.rs +++ b/turbine/src/xdp.rs @@ -19,7 +19,7 @@ use { std::{ error::Error, net::{Ipv4Addr, SocketAddr}, - sync::{atomic::AtomicBool, Arc}, + sync::{Arc, atomic::AtomicBool}, thread, }, };