Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 124 additions & 40 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,31 @@ impl ClusterInfo {
}
}

fn find_vote_index_to_evict(&self, should_evict_vote: impl Fn(&Vote) -> bool) -> u8 {
let self_pubkey = self.id();
let mut num_crds_votes = 0;
let vote_index = {
let gossip_crds =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8)
.filter_map(|ix| {
let vote = CrdsValueLabel::Vote(ix, self_pubkey);
let vote: &CrdsData = gossip_crds.get(&vote)?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's a good way to address this, but after a restart we could attempt to refresh a vote before our gossip crds is fully populated. in this case we could be evicting a later vote.

I think it's fine anyway, since with VoteStateUpdate since most of the time only the last vote matters, but just wanted to point out that we might not always evict the "oldest" vote here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it will be some vote "older" than the newest vote because to evict, you have to have >=32 votes in your gossip table

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think that's true as long as the tower file isn't corrupt which we shouldn't worry about.

num_crds_votes += 1;
match &vote {
CrdsData::Vote(_, vote) if should_evict_vote(vote) => {
Some((vote.wallclock, ix))
}
CrdsData::Vote(_, _) => None,
_ => panic!("this should not happen!"),
}
})
.min() // Boot the oldest evicted vote by wallclock.
.map(|(_ /*wallclock*/, ix)| ix)
};
vote_index.unwrap_or(num_crds_votes)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm should this be:
if num_crds_votes < MAX_LOCKOUT_HISTORY { num_crds_votes } else { vote_index}.
that way if the crds table isn't full we prioritize not evicting a vote?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, but wanted to keep the logic as similar as possible to the existing logic from push_vote where we evict a vote if it doesn't exist in tower, even if there's room in the crds table. Maybe thats slightly more efficient in gosisp than keeping around an unnecessary vote

}

pub fn push_vote(&self, tower: &[Slot], vote: Transaction) {
debug_assert!(tower.iter().tuple_windows().all(|(a, b)| a < b));
// Find a crds vote which is evicted from the tower, and recycle its
Expand All @@ -1057,8 +1082,7 @@ impl ClusterInfo {
// gossip.
// TODO: When there are more than one vote evicted from the tower, only
// one crds vote is overwritten here. Decide what to do with the rest.
let mut num_crds_votes = 0;
let self_pubkey = self.id();

// Returns true if the tower does not contain the vote.slot.
let should_evict_vote = |vote: &Vote| -> bool {
match vote.slot() {
Expand All @@ -1069,26 +1093,7 @@ impl ClusterInfo {
}
}
};
let vote_index = {
let gossip_crds =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8)
.filter_map(|ix| {
let vote = CrdsValueLabel::Vote(ix, self_pubkey);
let vote: &CrdsData = gossip_crds.get(&vote)?;
num_crds_votes += 1;
match &vote {
CrdsData::Vote(_, vote) if should_evict_vote(vote) => {
Some((vote.wallclock, ix))
}
CrdsData::Vote(_, _) => None,
_ => panic!("this should not happen!"),
}
})
.min() // Boot the oldest evicted vote by wallclock.
.map(|(_ /*wallclock*/, ix)| ix)
};
let vote_index = vote_index.unwrap_or(num_crds_votes);
let vote_index = self.find_vote_index_to_evict(should_evict_vote);
if (vote_index as usize) >= MAX_LOCKOUT_HISTORY {
let (_, vote, hash, _) = vote_parser::parse_vote_transaction(&vote).unwrap();
panic!(
Expand All @@ -1102,7 +1107,7 @@ impl ClusterInfo {
self.push_vote_at_index(vote, vote_index);
}

pub fn refresh_vote(&self, vote: Transaction, vote_slot: Slot) {
pub fn refresh_vote(&self, refresh_vote: Transaction, refresh_vote_slot: Slot) {
let vote_index = {
let self_pubkey = self.id();
let gossip_crds =
Expand All @@ -1116,7 +1121,7 @@ impl ClusterInfo {
panic!("this should not happen!");
};
match prev_vote.slot() {
Some(prev_vote_slot) => prev_vote_slot == vote_slot,
Some(prev_vote_slot) => prev_vote_slot == refresh_vote_slot,
None => {
error!("crds vote with no slots!");
false
Expand All @@ -1125,13 +1130,27 @@ impl ClusterInfo {
})
};

// If you don't see a vote with the same slot yet, this means you probably
// restarted, and need to wait for your oldest vote to propagate back to you.
//
// We don't write to an arbitrary index, because it may replace one of this validator's
// existing votes on the network.
if let Some(vote_index) = vote_index {
self.push_vote_at_index(vote, vote_index);
self.push_vote_at_index(refresh_vote, vote_index);
} else {
// If you don't see a vote with the same slot yet, this means you probably
// restarted, and need to repush and evict the oldest vote
let should_evict_vote = |vote: &Vote| -> bool {
vote.slot()
.map(|slot| refresh_vote_slot > slot)
.unwrap_or(true)
};
let vote_index = self.find_vote_index_to_evict(should_evict_vote);
if (vote_index as usize) >= MAX_LOCKOUT_HISTORY {
warn!(
"trying to refresh slot {} but all votes in gossip table are for newer slots",
refresh_vote_slot,
);
return;
}
self.push_vote_at_index(refresh_vote, vote_index);
}
}

Expand Down Expand Up @@ -3673,6 +3692,77 @@ mod tests {
.unwrap();
}

#[test]
fn test_refresh_vote_eviction() {
let keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified);

// Push MAX_LOCKOUT_HISTORY votes into gossip, one for each slot between
// [lowest_vote_slot, lowest_vote_slot + MAX_LOCKOUT_HISTORY)
let lowest_vote_slot = 1;
let max_vote_slot = lowest_vote_slot + MAX_LOCKOUT_HISTORY as Slot;
let mut first_vote = None;
let mut prev_votes = vec![];
for slot in 1..max_vote_slot {
prev_votes.push(slot);
let unrefresh_vote = Vote::new(vec![slot], Hash::new_unique());
let vote_ix = vote_instruction::vote(
&Pubkey::new_unique(), // vote_pubkey
&Pubkey::new_unique(), // authorized_voter_pubkey
unrefresh_vote,
);
let vote_tx = Transaction::new_with_payer(
&[vote_ix], // instructions
None, // payer
);
if first_vote.is_none() {
first_vote = Some(vote_tx.clone());
}
cluster_info.push_vote(&prev_votes, vote_tx);
}

let initial_votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(initial_votes.len(), MAX_LOCKOUT_HISTORY);

// Trying to refresh a vote less than all votes in gossip should do nothing
let refresh_slot = lowest_vote_slot - 1;
let refresh_vote = Vote::new(vec![refresh_slot], Hash::new_unique());
let refresh_ix = vote_instruction::vote(
&Pubkey::new_unique(), // vote_pubkey
&Pubkey::new_unique(), // authorized_voter_pubkey
refresh_vote.clone(),
);
let refresh_tx = Transaction::new_with_payer(
&[refresh_ix], // instructions
None, // payer
);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
let current_votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(initial_votes, current_votes);
assert!(!current_votes.contains(&refresh_tx));

// Trying to refresh a vote should evict the first slot less than the refreshed vote slot
let refresh_slot = max_vote_slot + 1;
let refresh_vote = Vote::new(vec![refresh_slot], Hash::new_unique());
let refresh_ix = vote_instruction::vote(
&Pubkey::new_unique(), // vote_pubkey
&Pubkey::new_unique(), // authorized_voter_pubkey
refresh_vote.clone(),
);
let refresh_tx = Transaction::new_with_payer(
&[refresh_ix], // instructions
None, // payer
);
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);

// This should evict the latest vote since it's for a slot less than refresh_slot
let votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), MAX_LOCKOUT_HISTORY);
assert!(votes.contains(&refresh_tx));
assert!(!votes.contains(&first_vote.unwrap()));
}

#[test]
fn test_refresh_vote() {
let keypair = Arc::new(Keypair::new());
Expand All @@ -3697,8 +3787,9 @@ mod tests {
let votes = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![unrefresh_tx.clone()]);

// Now construct vote for the slot to be refreshed later
let refresh_slot = 7;
// Now construct vote for the slot to be refreshed later. Has to be less than the `unrefresh_slot`,
// otherwise it will evict that slot
let refresh_slot = unrefresh_slot - 1;
let refresh_tower = vec![1, 3, unrefresh_slot, refresh_slot];
let refresh_vote = Vote::new(refresh_tower.clone(), Hash::new_unique());
let refresh_ix = vote_instruction::vote(
Expand All @@ -3712,19 +3803,12 @@ mod tests {
);

// Trying to refresh vote when it doesn't yet exist in gossip
// shouldn't add the vote
// should add the vote without eviction if there is room in the gossip table.
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
let votes = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
let votes = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), 1);
assert!(votes.contains(&unrefresh_tx));

// Push the new vote for `refresh_slot`
cluster_info.push_vote(&refresh_tower, refresh_tx.clone());

// Should be two votes in gossip
let votes = cluster_info.get_votes(&mut Cursor::default());
cursor = Cursor::default();
let votes = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 2);
assert!(votes.contains(&unrefresh_tx));
assert!(votes.contains(&refresh_tx));
Expand Down