Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::contact_info::ContactInfo;
use crate::crds_gossip::CrdsGossip;
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote};
use crate::crds_value::{CrdsValue, CrdsValueLabel, Votes};
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::repair_service::RepairType;
use crate::result::Result;
Expand Down Expand Up @@ -62,6 +62,9 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;

// Max vote cache size
const MAX_CACHED_VOTES: usize = 16;

#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
NoPeers,
Expand All @@ -80,6 +83,8 @@ pub struct ClusterInfo {
gossip_leader_id: Pubkey,
/// The network entrypoint
entrypoint: Option<ContactInfo>,
/// A Vote cache
vote_cache: Vec<Transaction>,
}

#[derive(Default, Clone)]
Expand Down Expand Up @@ -180,6 +185,7 @@ impl ClusterInfo {
keypair,
gossip_leader_id: Pubkey::default(),
entrypoint: None,
vote_cache: Vec::new(),
};
let id = contact_info.id;
me.gossip.set_self(&id);
Expand Down Expand Up @@ -286,9 +292,15 @@ impl ClusterInfo {
}

pub fn push_vote(&mut self, vote: Transaction) {
if self.vote_cache.len() == MAX_CACHED_VOTES {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe while len >= MAX remove(0)?
If push vote is ever called from two threads, it can go over MAX votes, and will never recover

self.vote_cache.remove(0);
}
self.vote_cache.push(vote);

let now = timestamp();
let vote = Vote::new(&self.id(), vote, now);
let vote = Votes::new(&self.id(), self.vote_cache.clone(), now);
let mut entry = CrdsValue::Vote(vote);

entry.sign(&self.keypair);
self.gossip.process_push_message(&[entry], now);
}
Expand All @@ -306,13 +318,21 @@ impl ClusterInfo {
.values()
.filter(|x| x.local_timestamp > since)
.filter_map(|x| {
x.value
.vote()
.map(|v| (x.local_timestamp, v.transaction.clone()))
x.value.votes().map(|votes| {
(
x.local_timestamp,
votes
.transactions
.iter()
.cloned()
.map(|tx| tx)
.collect::<Vec<_>>(),
)
})
})
.collect();
let max_ts = votes.iter().map(|x| x.0).max().unwrap_or(since);
let txs: Vec<Transaction> = votes.into_iter().map(|x| x.1).collect();
let txs: Vec<Transaction> = votes.into_iter().map(|x| x.1).flatten().collect();
(txs, max_ts)
}

Expand Down
30 changes: 15 additions & 15 deletions core/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,30 @@ pub enum CrdsValue {
/// * Merge Strategy - Latest wallclock is picked
ContactInfo(ContactInfo),
/// * Merge Strategy - Latest wallclock is picked
Vote(Vote),
Vote(Votes),
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Vote {
pub struct Votes {
pub from: Pubkey,
pub transaction: Transaction,
pub transactions: Vec<Transaction>,
pub signature: Signature,
pub wallclock: u64,
}

impl Signable for Vote {
impl Signable for Votes {
fn pubkey(&self) -> Pubkey {
self.from
}

fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData {
transaction: Transaction,
transaction: Vec<Transaction>,
wallclock: u64,
}
let data = SignData {
transaction: self.transaction.clone(),
transaction: self.transactions.clone(),
wallclock: self.wallclock,
};
serialize(&data).expect("unable to serialize Vote")
Expand Down Expand Up @@ -75,11 +75,11 @@ impl CrdsValueLabel {
}
}

impl Vote {
pub fn new(from: &Pubkey, transaction: Transaction, wallclock: u64) -> Self {
Vote {
impl Votes {
pub fn new(from: &Pubkey, transactions: Vec<Transaction>, wallclock: u64) -> Self {
Votes {
from: *from,
transaction,
transactions,
signature: Signature::default(),
wallclock,
}
Expand Down Expand Up @@ -110,9 +110,9 @@ impl CrdsValue {
_ => None,
}
}
pub fn vote(&self) -> Option<&Vote> {
pub fn votes(&self) -> Option<&Votes> {
match self {
CrdsValue::Vote(vote) => Some(vote),
CrdsValue::Vote(votes) => Some(votes),
_ => None,
}
}
Expand Down Expand Up @@ -186,9 +186,9 @@ mod test {
let key = v.clone().contact_info().unwrap().id;
assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key));

let v = CrdsValue::Vote(Vote::new(&Pubkey::default(), test_tx(), 0));
let v = CrdsValue::Vote(Votes::new(&Pubkey::default(), vec![test_tx()], 0));
assert_eq!(v.wallclock(), 0);
let key = v.clone().vote().unwrap().from;
let key = v.clone().votes().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::Vote(key));
}
#[test]
Expand All @@ -198,7 +198,7 @@ mod test {
let mut v =
CrdsValue::ContactInfo(ContactInfo::new_localhost(&keypair.pubkey(), timestamp()));
verify_signatures(&mut v, &keypair, &wrong_keypair);
v = CrdsValue::Vote(Vote::new(&keypair.pubkey(), test_tx(), timestamp()));
v = CrdsValue::Vote(Votes::new(&keypair.pubkey(), vec![test_tx()], timestamp()));
verify_signatures(&mut v, &keypair, &wrong_keypair);
}

Expand Down