From 1d8346a7054ef6646f9eac88033c94ca8e8ea6f8 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 19 Feb 2020 20:24:09 -0800 Subject: [PATCH] Bitwise compress incomplete epoch slots (#8341) (cherry picked from commit ea8d9d1aea2df8e82f319b070b6ccaa3db6c79e8) # Conflicts: # core/src/cluster_info.rs # core/src/crds_value.rs --- core/src/cluster_info.rs | 171 +++++++++++++++++++++++++++++++++++++++ core/src/crds_value.rs | 46 +++++++++-- 2 files changed, 209 insertions(+), 8 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 67d8d38d8bfb2c..937bcf51d48e57 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,6 +12,7 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight +use crate::crds_value::EpochIncompleteSlots; use crate::packet::limited_deserialize; use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ @@ -68,6 +69,8 @@ const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HE /// The largest protocol header size const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; +const NUM_BITS_PER_BYTE: u64 = 8; + #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { NoPeers, @@ -307,10 +310,91 @@ impl ClusterInfo { ) } +<<<<<<< HEAD pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet) { let now = timestamp(); let entry = CrdsValue::new_signed( CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)), +======= + pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet) -> EpochIncompleteSlots { + if !incomplete_slots.is_empty() { + let first_slot = incomplete_slots + .iter() + .next() + .expect("expected to find at least one slot"); + let last_slot = incomplete_slots + .iter() + .next_back() + .expect("expected to find last slot"); + let num_uncompressed_bits = last_slot.saturating_sub(*first_slot) + 1; + let num_uncompressed_bytes = if num_uncompressed_bits % NUM_BITS_PER_BYTE > 0 { + 1 + } else { + 0 + } + num_uncompressed_bits / NUM_BITS_PER_BYTE; + let mut uncompressed = vec![0u8; num_uncompressed_bytes as usize]; + incomplete_slots.iter().for_each(|slot| { + let offset_from_first_slot = slot.saturating_sub(*first_slot); + let index = offset_from_first_slot / NUM_BITS_PER_BYTE; + let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE; + uncompressed[index as usize] |= 1 << bit_index; + }); + if let Ok(compressed) = uncompressed + .iter() + .cloned() + .encode(&mut GZipEncoder::new(), Action::Finish) + .collect::, _>>() + { + return EpochIncompleteSlots { + first: *first_slot, + compressed_list: compressed, + }; + } + } + EpochIncompleteSlots::default() + } + + pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet { + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); + + if let Ok(decompressed) = slots + .compressed_list + .iter() + .cloned() + .decode(&mut GZipDecoder::new()) + .collect::, _>>() + { + decompressed.iter().enumerate().for_each(|(i, val)| { + if *val != 0 { + (0..8).for_each(|bit_index| { + if (1 << bit_index & *val) != 0 { + let slot = slots.first + i as u64 * NUM_BITS_PER_BYTE + bit_index; + old_incomplete_slots.insert(slot as u64); + } + }) + } + }) + } + + old_incomplete_slots + } + + pub fn push_epoch_slots( + &mut self, + id: Pubkey, + root: Slot, + min: Slot, + slots: BTreeSet, + incomplete_slots: &BTreeSet, + ) { + let compressed = Self::compress_incomplete_slots(incomplete_slots); + let now = timestamp(); + let entry = CrdsValue::new_signed( + CrdsData::EpochSlots( + 0, + EpochSlots::new(id, root, min, slots, vec![compressed], now), + ), +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) &self.keypair, ); self.gossip @@ -2120,6 +2204,7 @@ mod tests { for i in 0..128 { btree_slots.insert(i); } +<<<<<<< HEAD let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { from: Pubkey::default(), root: 0, @@ -2127,6 +2212,19 @@ mod tests { slots: btree_slots, wallclock: 0, })); +======= + let value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots: btree_slots, + stash: vec![], + wallclock: 0, + }, + )); +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) test_split_messages(value); } @@ -2137,6 +2235,7 @@ mod tests { let payload: Vec = vec![]; let vec_size = serialized_size(&payload).unwrap(); let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size; +<<<<<<< HEAD let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { from: Pubkey::default(), root: 0, @@ -2144,6 +2243,19 @@ mod tests { slots: BTreeSet::new(), wallclock: 0, })); +======= + let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots: BTreeSet::new(), + stash: vec![], + wallclock: 0, + }, + )); +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) let mut i = 0; while value.size() <= desired_size { @@ -2155,6 +2267,7 @@ mod tests { desired_size ); } +<<<<<<< HEAD value.data = CrdsData::EpochSlots(EpochSlots { from: Pubkey::default(), root: 0, @@ -2162,6 +2275,19 @@ mod tests { slots, wallclock: 0, }); +======= + value.data = CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots, + stash: vec![], + wallclock: 0, + }, + ); +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) i += 1; } let split = ClusterInfo::split_gossip_messages(vec![value.clone()]); @@ -2301,6 +2427,7 @@ mod tests { let other_node_pubkey = Pubkey::new_rand(); let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); cluster_info.insert_info(other_node.clone()); +<<<<<<< HEAD let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( other_node_pubkey, peer_root, @@ -2308,6 +2435,19 @@ mod tests { BTreeSet::new(), timestamp(), ))); +======= + let value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots::new( + other_node_pubkey, + peer_root, + peer_lowest, + BTreeSet::new(), + vec![], + timestamp(), + ), + )); +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) let _ = cluster_info.gossip.crds.insert(value, timestamp()); } // only half the visible peers should be eligible to serve this repair @@ -2367,4 +2507,35 @@ mod tests { serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; PACKET_DATA_SIZE - (protocol_size - filter_size) } +<<<<<<< HEAD +======= + + #[test] + fn test_compress_incomplete_slots() { + let mut incomplete_slots: BTreeSet = BTreeSet::new(); + + assert_eq!( + EpochIncompleteSlots::default(), + ClusterInfo::compress_incomplete_slots(&incomplete_slots) + ); + + incomplete_slots.insert(100); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); + assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(104); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); + assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(80); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(80, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); + assert_eq!(incomplete_slots, decompressed); + } +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index e6c87022bc51bb..fadaf55e42f9f3 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -15,6 +15,8 @@ use std::{ pub type VoteIndex = u8; pub const MAX_VOTES: VoteIndex = 32; +pub type EpochSlotIndex = u8; + /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct CrdsValue { @@ -58,7 +60,13 @@ impl Signable for CrdsValue { pub enum CrdsData { ContactInfo(ContactInfo), Vote(VoteIndex, Vote), - EpochSlots(EpochSlots), + EpochSlots(EpochSlotIndex, EpochSlots), +} + +#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] +pub struct EpochIncompleteSlots { + pub first: Slot, + pub compressed_list: Vec, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -67,6 +75,10 @@ pub struct EpochSlots { pub root: Slot, pub lowest: Slot, pub slots: BTreeSet, +<<<<<<< HEAD +======= + pub stash: Vec, +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) pub wallclock: u64, } @@ -76,6 +88,10 @@ impl EpochSlots { root: Slot, lowest: Slot, slots: BTreeSet, +<<<<<<< HEAD +======= + stash: Vec, +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) wallclock: u64, ) -> Self { Self { @@ -83,6 +99,10 @@ impl EpochSlots { root, lowest, slots, +<<<<<<< HEAD +======= + stash, +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) wallclock, } } @@ -154,21 +174,21 @@ impl CrdsValue { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.wallclock, CrdsData::Vote(_, vote) => vote.wallclock, - CrdsData::EpochSlots(vote) => vote.wallclock, + CrdsData::EpochSlots(_, vote) => vote.wallclock, } } pub fn pubkey(&self) -> Pubkey { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.id, CrdsData::Vote(_, vote) => vote.from, - CrdsData::EpochSlots(slots) => slots.from, + CrdsData::EpochSlots(_, slots) => slots.from, } } pub fn label(&self) -> CrdsValueLabel { match &self.data { CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()), CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), - CrdsData::EpochSlots(_) => CrdsValueLabel::EpochSlots(self.pubkey()), + CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -193,7 +213,7 @@ impl CrdsValue { pub fn epoch_slots(&self) -> Option<&EpochSlots> { match &self.data { - CrdsData::EpochSlots(slots) => Some(slots), + CrdsData::EpochSlots(_, slots) => Some(slots), _ => None, } } @@ -277,13 +297,16 @@ mod test { let key = v.clone().vote().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::Vote(0, key)); - let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( - Pubkey::default(), - 0, + let v = CrdsValue::new_unsigned(CrdsData::EpochSlots( 0, +<<<<<<< HEAD BTreeSet::new(), 0, ))); +======= + EpochSlots::new(Pubkey::default(), 0, 0, BTreeSet::new(), vec![], 0), + )); +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) assert_eq!(v.wallclock(), 0); let key = v.clone().epoch_slots().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key)); @@ -304,6 +327,7 @@ mod test { )); verify_signatures(&mut v, &keypair, &wrong_keypair); let btreeset: BTreeSet = vec![1, 2, 3, 6, 8].into_iter().collect(); +<<<<<<< HEAD v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( keypair.pubkey(), 0, @@ -311,6 +335,12 @@ mod test { btreeset, timestamp(), ))); +======= + v = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots::new(keypair.pubkey(), 0, 0, btreeset, vec![], timestamp()), + )); +>>>>>>> ea8d9d1ae... Bitwise compress incomplete epoch slots (#8341) verify_signatures(&mut v, &keypair, &wrong_keypair); }