diff --git a/Cargo.lock b/Cargo.lock index 375ac1d09a518a..3be7854d4cc7e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -566,6 +566,17 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "compression" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "console" version = "0.9.1" @@ -3764,6 +3775,7 @@ dependencies = [ "bs58 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "core_affinity 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6086,6 +6098,7 @@ dependencies = [ "checksum codespan-reporting 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab081a14ab8f9598ce826890fe896d0addee68c7a58ab49008369ccbb51510a8" "checksum colored 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6cdb90b60f2927f8d76139c72dbde7e10c3a2bc47c8594c9c7a66529f2687c03" "checksum combine 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1645a65a99c7c8d345761f4b75a6ffe5be3b3b27a93ee731fccc5050ba6be97c" +"checksum compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3a82b366ae14633c67a1cbb4aa3738210a23f77d2868a0fd50faa23a956f9ec4" "checksum console 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f5d540c2d34ac9dd0deb5f3b5f54c36c79efa78f6b3ad19106a554d07a7b5d9f" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" diff --git a/core/Cargo.toml b/core/Cargo.toml index d7fb6e648646dc..e0bbb306ca5bc1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,6 +18,7 @@ bincode = "1.2.1" bs58 = "0.3.0" byteorder = "1.3.2" chrono = { version = "0.4.10", features = ["serde"] } +compression = "0.1.5" core_affinity = "0.5.10" crc = { version = "1.8.1", optional = true } crossbeam-channel = "0.3" diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 67d8d38d8bfb2c..4cfb4f8bf0fa6e 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,6 +12,7 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight +use crate::crds_value::EpochIncompleteSlots; use crate::packet::limited_deserialize; use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ @@ -26,6 +27,7 @@ use crate::{ weighted_shuffle::{weighted_best, weighted_shuffle}, }; use bincode::{serialize, serialized_size}; +use compression::prelude::*; use core::cmp; use itertools::Itertools; use solana_ledger::{bank_forks::BankForks, staking_utils}; @@ -68,6 +70,8 @@ const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HE /// The largest protocol header size const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; +const NUM_BITS_PER_BYTE: u64 = 8; + #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { NoPeers, @@ -307,10 +311,84 @@ impl ClusterInfo { ) } - pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet) { + pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet) -> EpochIncompleteSlots { + if !incomplete_slots.is_empty() { + let first_slot = incomplete_slots + .iter() + .next() + .expect("expected to find at least one slot"); + let last_slot = incomplete_slots + .iter() + .next_back() + .expect("expected to find last slot"); + let num_uncompressed_bits = last_slot.saturating_sub(*first_slot) + 1; + let num_uncompressed_bytes = if num_uncompressed_bits % NUM_BITS_PER_BYTE > 0 { + 1 + } else { + 0 + } + num_uncompressed_bits / NUM_BITS_PER_BYTE; + let mut uncompressed = vec![0u8; num_uncompressed_bytes as usize]; + incomplete_slots.iter().for_each(|slot| { + let offset_from_first_slot = slot.saturating_sub(*first_slot); + let index = offset_from_first_slot / NUM_BITS_PER_BYTE; + let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE; + uncompressed[index as usize] |= 1 << bit_index; + }); + if let Ok(compressed) = uncompressed + .iter() + .cloned() + .encode(&mut GZipEncoder::new(), Action::Finish) + .collect::, _>>() + { + return EpochIncompleteSlots { + first: *first_slot, + compressed_list: compressed, + }; + } + } + EpochIncompleteSlots::default() + } + + pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet { + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); + + if let Ok(decompressed) = slots + .compressed_list + .iter() + .cloned() + .decode(&mut GZipDecoder::new()) + .collect::, _>>() + { + decompressed.iter().enumerate().for_each(|(i, val)| { + if *val != 0 { + (0..8).for_each(|bit_index| { + if (1 << bit_index & *val) != 0 { + let slot = slots.first + i as u64 * NUM_BITS_PER_BYTE + bit_index; + old_incomplete_slots.insert(slot as u64); + } + }) + } + }) + } + + old_incomplete_slots + } + + pub fn push_epoch_slots( + &mut self, + id: Pubkey, + root: Slot, + min: Slot, + slots: BTreeSet, + incomplete_slots: &BTreeSet, + ) { + let compressed = Self::compress_incomplete_slots(incomplete_slots); let now = timestamp(); let entry = CrdsValue::new_signed( - CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)), + CrdsData::EpochSlots( + 0, + EpochSlots::new(id, root, min, slots, vec![compressed], now), + ), &self.keypair, ); self.gossip @@ -2120,13 +2198,17 @@ mod tests { for i in 0..128 { btree_slots.insert(i); } - let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots: btree_slots, - wallclock: 0, - })); + let value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots: btree_slots, + stash: vec![], + wallclock: 0, + }, + )); test_split_messages(value); } @@ -2137,13 +2219,17 @@ mod tests { let payload: Vec = vec![]; let vec_size = serialized_size(&payload).unwrap(); let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size; - let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots: BTreeSet::new(), - wallclock: 0, - })); + let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots: BTreeSet::new(), + stash: vec![], + wallclock: 0, + }, + )); let mut i = 0; while value.size() <= desired_size { @@ -2155,13 +2241,17 @@ mod tests { desired_size ); } - value.data = CrdsData::EpochSlots(EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots, - wallclock: 0, - }); + value.data = CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots, + stash: vec![], + wallclock: 0, + }, + ); i += 1; } let split = ClusterInfo::split_gossip_messages(vec![value.clone()]); @@ -2301,13 +2391,17 @@ mod tests { let other_node_pubkey = Pubkey::new_rand(); let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); cluster_info.insert_info(other_node.clone()); - let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( - other_node_pubkey, - peer_root, - peer_lowest, - BTreeSet::new(), - timestamp(), - ))); + let value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots::new( + other_node_pubkey, + peer_root, + peer_lowest, + BTreeSet::new(), + vec![], + timestamp(), + ), + )); let _ = cluster_info.gossip.crds.insert(value, timestamp()); } // only half the visible peers should be eligible to serve this repair @@ -2367,4 +2461,32 @@ mod tests { serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; PACKET_DATA_SIZE - (protocol_size - filter_size) } + + #[test] + fn test_compress_incomplete_slots() { + let mut incomplete_slots: BTreeSet = BTreeSet::new(); + + assert_eq!( + EpochIncompleteSlots::default(), + ClusterInfo::compress_incomplete_slots(&incomplete_slots) + ); + + incomplete_slots.insert(100); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); + assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(104); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); + assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(80); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(80, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); + assert_eq!(incomplete_slots, decompressed); + } } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index e6c87022bc51bb..8b9017aa27ec6a 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -15,6 +15,8 @@ use std::{ pub type VoteIndex = u8; pub const MAX_VOTES: VoteIndex = 32; +pub type EpochSlotIndex = u8; + /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct CrdsValue { @@ -58,7 +60,13 @@ impl Signable for CrdsValue { pub enum CrdsData { ContactInfo(ContactInfo), Vote(VoteIndex, Vote), - EpochSlots(EpochSlots), + EpochSlots(EpochSlotIndex, EpochSlots), +} + +#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] +pub struct EpochIncompleteSlots { + pub first: Slot, + pub compressed_list: Vec, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -67,6 +75,7 @@ pub struct EpochSlots { pub root: Slot, pub lowest: Slot, pub slots: BTreeSet, + pub stash: Vec, pub wallclock: u64, } @@ -76,6 +85,7 @@ impl EpochSlots { root: Slot, lowest: Slot, slots: BTreeSet, + stash: Vec, wallclock: u64, ) -> Self { Self { @@ -83,6 +93,7 @@ impl EpochSlots { root, lowest, slots, + stash, wallclock, } } @@ -154,21 +165,21 @@ impl CrdsValue { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.wallclock, CrdsData::Vote(_, vote) => vote.wallclock, - CrdsData::EpochSlots(vote) => vote.wallclock, + CrdsData::EpochSlots(_, vote) => vote.wallclock, } } pub fn pubkey(&self) -> Pubkey { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.id, CrdsData::Vote(_, vote) => vote.from, - CrdsData::EpochSlots(slots) => slots.from, + CrdsData::EpochSlots(_, slots) => slots.from, } } pub fn label(&self) -> CrdsValueLabel { match &self.data { CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()), CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), - CrdsData::EpochSlots(_) => CrdsValueLabel::EpochSlots(self.pubkey()), + CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -193,7 +204,7 @@ impl CrdsValue { pub fn epoch_slots(&self) -> Option<&EpochSlots> { match &self.data { - CrdsData::EpochSlots(slots) => Some(slots), + CrdsData::EpochSlots(_, slots) => Some(slots), _ => None, } } @@ -277,13 +288,10 @@ mod test { let key = v.clone().vote().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::Vote(0, key)); - let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( - Pubkey::default(), + let v = CrdsValue::new_unsigned(CrdsData::EpochSlots( 0, - 0, - BTreeSet::new(), - 0, - ))); + EpochSlots::new(Pubkey::default(), 0, 0, BTreeSet::new(), vec![], 0), + )); assert_eq!(v.wallclock(), 0); let key = v.clone().epoch_slots().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key)); @@ -304,13 +312,10 @@ mod test { )); verify_signatures(&mut v, &keypair, &wrong_keypair); let btreeset: BTreeSet = vec![1, 2, 3, 6, 8].into_iter().collect(); - v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( - keypair.pubkey(), - 0, + v = CrdsValue::new_unsigned(CrdsData::EpochSlots( 0, - btreeset, - timestamp(), - ))); + EpochSlots::new(keypair.pubkey(), 0, 0, btreeset, vec![], timestamp()), + )); verify_signatures(&mut v, &keypair, &wrong_keypair); } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index d6a84996466c28..a31f22de660184 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -9,11 +9,12 @@ use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, }; +use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey}; use std::{ collections::BTreeSet, net::UdpSocket, - ops::Bound::{Excluded, Unbounded}, + ops::Bound::{Included, Unbounded}, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, thread::sleep, @@ -25,6 +26,9 @@ pub const MAX_REPAIR_LENGTH: usize = 512; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; +const MAX_COMPLETED_SLOT_CACHE_LEN: usize = 256; +const COMPLETED_SLOT_CACHE_FLUSH_TRIGGER: usize = 512; + pub enum RepairStrategy { RepairRange(RepairSlotRange), RepairAll { @@ -85,17 +89,18 @@ impl RepairService { ) { let serve_repair = ServeRepair::new(cluster_info.clone()); let mut epoch_slots: BTreeSet = BTreeSet::new(); + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); let id = cluster_info.read().unwrap().id(); - let mut current_root = 0; if let RepairStrategy::RepairAll { ref epoch_schedule, .. } = repair_strategy { - current_root = blockstore.last_root(); + let current_root = blockstore.last_root(); Self::initialize_epoch_slots( id, blockstore, &mut epoch_slots, + &old_incomplete_slots, current_root, epoch_schedule, cluster_info, @@ -127,8 +132,8 @@ impl RepairService { id, new_root, lowest_slot, - &mut current_root, &mut epoch_slots, + &mut old_incomplete_slots, &cluster_info, completed_slots_receiver, ); @@ -292,6 +297,7 @@ impl RepairService { id: Pubkey, blockstore: &Blockstore, slots_in_gossip: &mut BTreeSet, + old_incomplete_slots: &BTreeSet, root: Slot, epoch_schedule: &EpochSchedule, cluster_info: &RwLock, @@ -307,6 +313,7 @@ impl RepairService { root, blockstore.lowest_slot(), slots_in_gossip.clone(), + old_incomplete_slots, ); } @@ -316,44 +323,83 @@ impl RepairService { id: Pubkey, latest_known_root: Slot, lowest_slot: Slot, - prev_root: &mut Slot, - slots_in_gossip: &mut BTreeSet, + completed_slot_cache: &mut BTreeSet, + incomplete_slot_stash: &mut BTreeSet, cluster_info: &RwLock, completed_slots_receiver: &CompletedSlotsReceiver, ) { - // If the latest known root is different, update gossip. - let mut should_update = latest_known_root != *prev_root; + let mut should_update = false; while let Ok(completed_slots) = completed_slots_receiver.try_recv() { for slot in completed_slots { - // If the newly completed slot > root, and the set did not contain this value - // before, we should update gossip. - if slot > latest_known_root { - should_update |= slots_in_gossip.insert(slot); + let last_slot_in_stash = *incomplete_slot_stash.iter().next_back().unwrap_or(&0); + let removed_from_stash = incomplete_slot_stash.remove(&slot); + // If the newly completed slot was not being tracked in stash, and is > last + // slot being tracked in stash, add it to cache. Also, update gossip + if !removed_from_stash && slot >= last_slot_in_stash { + should_update |= completed_slot_cache.insert(slot); } + // If the slot was removed from stash, update gossip + should_update |= removed_from_stash; } } if should_update { - // Filter out everything <= root - if latest_known_root != *prev_root { - *prev_root = latest_known_root; - Self::retain_slots_greater_than_root(slots_in_gossip, latest_known_root); + if completed_slot_cache.len() >= COMPLETED_SLOT_CACHE_FLUSH_TRIGGER { + Self::stash_old_incomplete_slots(completed_slot_cache, incomplete_slot_stash); + let lowest_completed_slot_in_cache = + *completed_slot_cache.iter().next().unwrap_or(&0); + Self::prune_incomplete_slot_stash( + incomplete_slot_stash, + lowest_completed_slot_in_cache, + ); } cluster_info.write().unwrap().push_epoch_slots( id, latest_known_root, lowest_slot, - slots_in_gossip.clone(), + completed_slot_cache.clone(), + incomplete_slot_stash, ); } } - fn retain_slots_greater_than_root(slot_set: &mut BTreeSet, root: Slot) { - *slot_set = slot_set - .range((Excluded(&root), Unbounded)) - .cloned() - .collect(); + fn stash_old_incomplete_slots(cache: &mut BTreeSet, stash: &mut BTreeSet) { + if cache.len() > MAX_COMPLETED_SLOT_CACHE_LEN { + let mut prev = *cache.iter().next().expect("Expected to find some slot"); + cache.remove(&prev); + while cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN { + let next = *cache.iter().next().expect("Expected to find some slot"); + cache.remove(&next); + // Prev slot and next slot are not included in incomplete slot list. + (prev + 1..next).for_each(|slot| { + stash.insert(slot); + }); + prev = next; + } + } + } + + fn prune_incomplete_slot_stash( + stash: &mut BTreeSet, + lowest_completed_slot_in_cache: Slot, + ) { + if let Some(oldest_incomplete_slot) = stash.iter().next() { + // Prune old slots + // Prune in batches to reduce overhead. Pruning starts when oldest slot is 1.5 epochs + // earlier than the new root. But, we prune all the slots that are older than 1 epoch. + // So slots in a batch of half epoch are getting pruned + if oldest_incomplete_slot + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + < lowest_completed_slot_in_cache + { + let oldest_slot_to_retain = + lowest_completed_slot_in_cache.saturating_sub(DEFAULT_SLOTS_PER_EPOCH); + *stash = stash + .range((Included(&oldest_slot_to_retain), Unbounded)) + .cloned() + .collect(); + } + } } pub fn join(self) -> thread::Result<()> { @@ -373,7 +419,6 @@ mod test { }; use solana_ledger::shred::max_ticks_per_n_shreds; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; - use std::sync::mpsc::channel; use std::thread::Builder; #[test] @@ -703,13 +748,14 @@ mod test { node_info.info.clone(), )); + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); while completed_slots.len() < num_slots as usize { RepairService::update_epoch_slots( Pubkey::default(), root, blockstore.lowest_slot(), - &mut root.clone(), &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); @@ -726,13 +772,12 @@ mod test { Pubkey::default(), root, 0, - &mut 0, &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); expected.insert(num_slots + 2); - RepairService::retain_slots_greater_than_root(&mut expected, root); assert_eq!(completed_slots, expected); writer.join().unwrap(); } @@ -740,95 +785,133 @@ mod test { } #[test] - fn test_update_epoch_slots_new_root() { - let mut current_root = 0; - - let mut completed_slots = BTreeSet::new(); - let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); - let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( - node_info.info.clone(), - )); - let my_pubkey = Pubkey::new_rand(); - let (completed_slots_sender, completed_slots_receiver) = channel(); - - // Send a new slot before the root is updated - let newly_completed_slot = 63; - completed_slots_sender - .send(vec![newly_completed_slot]) - .unwrap(); - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root, - 0, - &mut current_root.clone(), - &mut completed_slots, - &cluster_info, - &completed_slots_receiver, - ); - - // We should see epoch state update - let (my_epoch_slots_in_gossip, updated_ts) = { - let r_cluster_info = cluster_info.read().unwrap(); - - let (my_epoch_slots_in_gossip, updated_ts) = r_cluster_info - .get_epoch_state_for_node(&my_pubkey, None) - .clone() - .unwrap(); - - (my_epoch_slots_in_gossip.clone(), updated_ts) - }; - - assert_eq!(my_epoch_slots_in_gossip.root, 0); - assert_eq!(current_root, 0); - assert_eq!(my_epoch_slots_in_gossip.slots.len(), 1); - assert!(my_epoch_slots_in_gossip - .slots - .contains(&newly_completed_slot)); - - // Calling update again with no updates to either the roots or set of completed slots - // should not update gossip - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root, - 0, - &mut current_root, - &mut completed_slots, - &cluster_info, - &completed_slots_receiver, - ); + fn test_stash_old_incomplete_slots() { + let mut cache: BTreeSet = BTreeSet::new(); + let mut stash: BTreeSet = BTreeSet::new(); + + // When cache is empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); + + // Insert some slots in cache ( < MAX_COMPLETED_SLOT_CACHE_LEN + 1) + cache.insert(101); + cache.insert(102); + cache.insert(104); + cache.insert(105); + + // Not enough slots in cache. So stash should remain empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); + assert_eq!(cache.len(), 4); + + // Insert slots in cache ( = MAX_COMPLETED_SLOT_CACHE_LEN) + let mut cache: BTreeSet = BTreeSet::new(); + (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64) + .into_iter() + .for_each(|slot| { + cache.insert(slot); + }); + + // Not enough slots in cache. So stash should remain empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN); + + // Insert 1 more to cross the threshold + cache.insert(MAX_COMPLETED_SLOT_CACHE_LEN as u64); + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + // Stash is still empty, as no missing slots + assert_eq!(stash.len(), 0); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + + // Insert more slots to create a missing slot + let mut cache: BTreeSet = BTreeSet::new(); + cache.insert(0); + (2..=MAX_COMPLETED_SLOT_CACHE_LEN as u64 + 2) + .into_iter() + .for_each(|slot| { + cache.insert(slot); + }); + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + + // Stash is not empty + assert!(stash.contains(&1)); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + + // Test multiple missing slots at dispersed locations + let mut cache: BTreeSet = BTreeSet::new(); + (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64 * 2) + .into_iter() + .for_each(|slot| { + cache.insert(slot); + }); + + cache.remove(&10); + cache.remove(&11); + + cache.remove(&28); + cache.remove(&29); + + cache.remove(&148); + cache.remove(&149); + cache.remove(&150); + cache.remove(&151); + + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + + // Stash is not empty + assert!(stash.contains(&10)); + assert!(stash.contains(&11)); + assert!(stash.contains(&28)); + assert!(stash.contains(&29)); + assert!(stash.contains(&148)); + assert!(stash.contains(&149)); + assert!(stash.contains(&150)); + assert!(stash.contains(&151)); + + assert!(!stash.contains(&147)); + assert!(!stash.contains(&152)); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + (MAX_COMPLETED_SLOT_CACHE_LEN + 1..MAX_COMPLETED_SLOT_CACHE_LEN * 2) + .into_iter() + .for_each(|slot| { + let slot: u64 = slot as u64; + assert!(cache.contains(&slot)); + }); + } - assert!(cluster_info - .read() - .unwrap() - .get_epoch_state_for_node(&my_pubkey, Some(updated_ts)) - .is_none()); - - sleep(Duration::from_millis(10)); - // Updating just the root again should update gossip (simulates replay stage updating root - // after a slot has been signaled as completed) - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root + 1, - 0, - &mut current_root, - &mut completed_slots, - &cluster_info, - &completed_slots_receiver, + #[test] + fn test_prune_incomplete_slot_stash() { + // Prune empty stash + let mut stash: BTreeSet = BTreeSet::new(); + RepairService::prune_incomplete_slot_stash(&mut stash, 0); + assert!(stash.is_empty()); + + // Prune stash with slots < DEFAULT_SLOTS_PER_EPOCH + stash.insert(0); + stash.insert(10); + stash.insert(11); + stash.insert(50); + assert_eq!(stash.len(), 4); + RepairService::prune_incomplete_slot_stash(&mut stash, 100); + assert_eq!(stash.len(), 4); + + // Prune stash with slots > DEFAULT_SLOTS_PER_EPOCH, but < 1.5 * DEFAULT_SLOTS_PER_EPOCH + stash.insert(DEFAULT_SLOTS_PER_EPOCH + 50); + assert_eq!(stash.len(), 5); + RepairService::prune_incomplete_slot_stash(&mut stash, DEFAULT_SLOTS_PER_EPOCH + 100); + assert_eq!(stash.len(), 5); + + // Prune stash with slots > 1.5 * DEFAULT_SLOTS_PER_EPOCH + stash.insert(DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2); + assert_eq!(stash.len(), 6); + RepairService::prune_incomplete_slot_stash( + &mut stash, + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + 1, ); - - let r_cluster_info = cluster_info.read().unwrap(); - - let (my_epoch_slots_in_gossip, _) = r_cluster_info - .get_epoch_state_for_node(&my_pubkey, Some(updated_ts)) - .clone() - .unwrap(); - - // Check the root was updated correctly - assert_eq!(my_epoch_slots_in_gossip.root, 1); - assert_eq!(current_root, 1); - assert_eq!(my_epoch_slots_in_gossip.slots.len(), 1); - assert!(my_epoch_slots_in_gossip - .slots - .contains(&newly_completed_slot)); + assert_eq!(stash.len(), 2); } }