Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions core/src/banking_stage/latest_unprocessed_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ impl LatestUnprocessedVotes {
let pubkey = vote.pubkey();
let slot = vote.slot();
let timestamp = vote.timestamp();
if let Some(latest_vote) = self.get_entry(pubkey) {

let with_latest_vote = |latest_vote: &RwLock<LatestValidatorVotePacket>,
vote: LatestValidatorVotePacket|
-> Option<LatestValidatorVotePacket> {
let (latest_slot, latest_timestamp) = latest_vote
.read()
.map(|vote| (vote.slot(), vote.timestamp()))
Expand All @@ -226,14 +229,32 @@ impl LatestUnprocessedVotes {
}
}
}
return Some(vote);
}
Some(vote)
};

<<<<<<< HEAD
// Should have low lock contention because this is only hit on the first few blocks of startup
// and when a new vote account starts voting.
let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap();
latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote)));
None
=======
if let Some(latest_vote) = self.get_entry(pubkey) {
with_latest_vote(&latest_vote, vote)
} else {
// Grab write-lock to insert new vote.
match self.latest_votes_per_pubkey.write().unwrap().entry(pubkey) {
std::collections::hash_map::Entry::Occupied(entry) => {
with_latest_vote(entry.get(), vote)
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(Arc::new(RwLock::new(vote)));
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
None
}
}
}
>>>>>>> d06a3aceb2 (fix race condition on vote count (#1762))
}

#[cfg(test)]
Expand Down Expand Up @@ -678,6 +699,47 @@ mod tests {
);
}

#[test]
fn test_update_latest_vote_race() {
// There was a race condition in updating the same pubkey in the hashmap
// when the entry does not initially exist.
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());

const NUM_VOTES: usize = 100;
let keypairs = Arc::new(
(0..NUM_VOTES)
.map(|_| ValidatorVoteKeypairs::new_rand())
.collect_vec(),
);

// Insert votes in parallel
let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes,
keypairs: &Arc<Vec<ValidatorVoteKeypairs>>,
i: usize| {
let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None);
latest_unprocessed_votes.update_latest_vote(vote);
};

let hdl = Builder::new()
.spawn({
let latest_unprocessed_votes = latest_unprocessed_votes.clone();
let keypairs = keypairs.clone();
move || {
for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}
}
})
.unwrap();

for i in 0..NUM_VOTES {
insert_vote(&latest_unprocessed_votes, &keypairs, i);
}

hdl.join().unwrap();
assert_eq!(NUM_VOTES, latest_unprocessed_votes.len());
}

#[test]
fn test_simulate_threads() {
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
Expand Down