diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index a62e5bf9b3e455..6e06ab04110192 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -205,7 +205,10 @@ impl LatestUnprocessedVotes { let pubkey = vote.pubkey(); let slot = vote.slot(); let timestamp = vote.timestamp(); - if let Some(latest_vote) = self.get_entry(pubkey) { + + let with_latest_vote = |latest_vote: &RwLock, + vote: LatestValidatorVotePacket| + -> Option { let (latest_slot, latest_timestamp) = latest_vote .read() .map(|vote| (vote.slot(), vote.timestamp())) @@ -226,14 +229,32 @@ impl LatestUnprocessedVotes { } } } - return Some(vote); - } + Some(vote) + }; +<<<<<<< HEAD // Should have low lock contention because this is only hit on the first few blocks of startup // and when a new vote account starts voting. let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap(); latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote))); None +======= + if let Some(latest_vote) = self.get_entry(pubkey) { + with_latest_vote(&latest_vote, vote) + } else { + // Grab write-lock to insert new vote. + match self.latest_votes_per_pubkey.write().unwrap().entry(pubkey) { + std::collections::hash_map::Entry::Occupied(entry) => { + with_latest_vote(entry.get(), vote) + } + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(Arc::new(RwLock::new(vote))); + self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed); + None + } + } + } +>>>>>>> d06a3aceb2 (fix race condition on vote count (#1762)) } #[cfg(test)] @@ -678,6 +699,47 @@ mod tests { ); } + #[test] + fn test_update_latest_vote_race() { + // There was a race condition in updating the same pubkey in the hashmap + // when the entry does not initially exist. + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + + const NUM_VOTES: usize = 100; + let keypairs = Arc::new( + (0..NUM_VOTES) + .map(|_| ValidatorVoteKeypairs::new_rand()) + .collect_vec(), + ); + + // Insert votes in parallel + let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, + keypairs: &Arc>, + i: usize| { + let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None); + latest_unprocessed_votes.update_latest_vote(vote); + }; + + let hdl = Builder::new() + .spawn({ + let latest_unprocessed_votes = latest_unprocessed_votes.clone(); + let keypairs = keypairs.clone(); + move || { + for i in 0..NUM_VOTES { + insert_vote(&latest_unprocessed_votes, &keypairs, i); + } + } + }) + .unwrap(); + + for i in 0..NUM_VOTES { + insert_vote(&latest_unprocessed_votes, &keypairs, i); + } + + hdl.join().unwrap(); + assert_eq!(NUM_VOTES, latest_unprocessed_votes.len()); + } + #[test] fn test_simulate_threads() { let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());