diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4aff63afdef226..aba3ae5c2f1be1 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -18,7 +18,7 @@ use crate::contact_info::ContactInfo; use crate::crds_gossip::CrdsGossip; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; -use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote}; +use crate::crds_value::{CrdsValue, CrdsValueLabel, Votes}; use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::repair_service::RepairType; use crate::result::Result; @@ -62,6 +62,9 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100; /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; +// Max vote cache size +const MAX_CACHED_VOTES: usize = 16; + #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { NoPeers, @@ -80,6 +83,8 @@ pub struct ClusterInfo { gossip_leader_id: Pubkey, /// The network entrypoint entrypoint: Option, + /// A Vote cache + vote_cache: Vec, } #[derive(Default, Clone)] @@ -180,6 +185,7 @@ impl ClusterInfo { keypair, gossip_leader_id: Pubkey::default(), entrypoint: None, + vote_cache: Vec::new(), }; let id = contact_info.id; me.gossip.set_self(&id); @@ -286,9 +292,15 @@ impl ClusterInfo { } pub fn push_vote(&mut self, vote: Transaction) { + if self.vote_cache.len() == MAX_CACHED_VOTES { + self.vote_cache.remove(0); + } + self.vote_cache.push(vote); + let now = timestamp(); - let vote = Vote::new(&self.id(), vote, now); + let vote = Votes::new(&self.id(), self.vote_cache.clone(), now); let mut entry = CrdsValue::Vote(vote); + entry.sign(&self.keypair); self.gossip.process_push_message(&[entry], now); } @@ -306,13 +318,21 @@ impl ClusterInfo { .values() .filter(|x| x.local_timestamp > since) .filter_map(|x| { - x.value - .vote() - .map(|v| (x.local_timestamp, v.transaction.clone())) + x.value.votes().map(|votes| { + ( + x.local_timestamp, + votes + .transactions + .iter() + .cloned() + .map(|tx| tx) + .collect::>(), + ) + }) }) .collect(); let max_ts = votes.iter().map(|x| x.0).max().unwrap_or(since); - let txs: Vec = votes.into_iter().map(|x| x.1).collect(); + let txs: Vec = votes.into_iter().map(|x| x.1).flatten().collect(); (txs, max_ts) } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index c885722a1f5bae..4556ad139578b3 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -11,18 +11,18 @@ pub enum CrdsValue { /// * Merge Strategy - Latest wallclock is picked ContactInfo(ContactInfo), /// * Merge Strategy - Latest wallclock is picked - Vote(Vote), + Vote(Votes), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct Vote { +pub struct Votes { pub from: Pubkey, - pub transaction: Transaction, + pub transactions: Vec, pub signature: Signature, pub wallclock: u64, } -impl Signable for Vote { +impl Signable for Votes { fn pubkey(&self) -> Pubkey { self.from } @@ -30,11 +30,11 @@ impl Signable for Vote { fn signable_data(&self) -> Vec { #[derive(Serialize)] struct SignData { - transaction: Transaction, + transaction: Vec, wallclock: u64, } let data = SignData { - transaction: self.transaction.clone(), + transaction: self.transactions.clone(), wallclock: self.wallclock, }; serialize(&data).expect("unable to serialize Vote") @@ -75,11 +75,11 @@ impl CrdsValueLabel { } } -impl Vote { - pub fn new(from: &Pubkey, transaction: Transaction, wallclock: u64) -> Self { - Vote { +impl Votes { + pub fn new(from: &Pubkey, transactions: Vec, wallclock: u64) -> Self { + Votes { from: *from, - transaction, + transactions, signature: Signature::default(), wallclock, } @@ -110,9 +110,9 @@ impl CrdsValue { _ => None, } } - pub fn vote(&self) -> Option<&Vote> { + pub fn votes(&self) -> Option<&Votes> { match self { - CrdsValue::Vote(vote) => Some(vote), + CrdsValue::Vote(votes) => Some(votes), _ => None, } } @@ -186,9 +186,9 @@ mod test { let key = v.clone().contact_info().unwrap().id; assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key)); - let v = CrdsValue::Vote(Vote::new(&Pubkey::default(), test_tx(), 0)); + let v = CrdsValue::Vote(Votes::new(&Pubkey::default(), vec![test_tx()], 0)); assert_eq!(v.wallclock(), 0); - let key = v.clone().vote().unwrap().from; + let key = v.clone().votes().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::Vote(key)); } #[test] @@ -198,7 +198,7 @@ mod test { let mut v = CrdsValue::ContactInfo(ContactInfo::new_localhost(&keypair.pubkey(), timestamp())); verify_signatures(&mut v, &keypair, &wrong_keypair); - v = CrdsValue::Vote(Vote::new(&keypair.pubkey(), test_tx(), timestamp())); + v = CrdsValue::Vote(Votes::new(&keypair.pubkey(), vec![test_tx()], timestamp())); verify_signatures(&mut v, &keypair, &wrong_keypair); }