Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 62 additions & 9 deletions core/src/banking_stage/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ impl LatestUnprocessedVotes {
let pubkey = vote.pubkey();
let slot = vote.slot();
let timestamp = vote.timestamp();
if let Some(latest_vote) = self.get_entry(pubkey) {

let with_latest_vote = |latest_vote: &RwLock<LatestValidatorVotePacket>,
vote: LatestValidatorVotePacket|
-> Option<LatestValidatorVotePacket> {
let (latest_slot, latest_timestamp) = latest_vote
.read()
.map(|vote| (vote.slot(), vote.timestamp()))
Expand All @@ -225,15 +228,24 @@ impl LatestUnprocessedVotes {
}
}
}
return Some(vote);
}
Some(vote)
};

// Should have low lock contention because this is only hit on the first few blocks of startup
// and when a new vote account starts voting.
let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap();
latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote)));
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
None
if let Some(latest_vote) = self.get_entry(pubkey) {
with_latest_vote(&latest_vote, vote)
} else {
// Grab write-lock to insert new vote.
match self.latest_votes_per_pubkey.write().unwrap().entry(pubkey) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could get away with just this match, right? But this would mean taking the write lock every time...

Are we only expected to take this path during startup?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah afaict we do not remove keys from the map, so we only write-lock when we receive a vote from a node we hadn't seen before - though @AshwinSekar would be more familiar with when/if we clear entries out of the map.

We don't wanna write-lock everytime, so we have the initial get_entry branch.

Copy link
Copy Markdown

@AshwinSekar AshwinSekar Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not remove keys from the map, and we only write-lock here either on startup or when a validator submits a vote for the first time. the match avoids taking the write lock on the entire map for every new vote insertion

I like the approach you have here, thanks for noticing this.

std::collections::hash_map::Entry::Occupied(entry) => {
with_latest_vote(entry.get(), vote)
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Arc::new(RwLock::new(vote)));
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
None
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -682,6 +694,47 @@ mod tests {
);
}

#[test]
fn test_update_latest_vote_race() {
// There was a race condition in updating the same pubkey in the hashmap
// when the entry does not initially exist.
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

const NUM_VOTES: usize = 100;
let keypairs = Arc::new(
(0..NUM_VOTES)
.map(|_| ValidatorVoteKeypairs::new_rand())
.collect_vec(),
);

// Insert votes in parallel
let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes,
keypairs: &Arc<Vec<ValidatorVoteKeypairs>>,
i: usize| {
let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None);
latest_unprocessed_votes.update_latest_vote(vote);
};

let hdl = Builder::new()
.spawn({
let latest_unprocessed_votes = latest_unprocessed_votes.clone();
let keypairs = keypairs.clone();
move || {
for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}
}
})
.unwrap();

for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}

hdl.join().unwrap();
assert_eq!(NUM_VOTES, latest_unprocessed_votes.len());
}

#[test]
fn test_simulate_threads() {
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
Expand Down