diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index a62e5bf9b3e455..2d2d1a4762d63e 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -16,7 +16,10 @@ use { std::{ collections::HashMap, ops::DerefMut, - sync::{Arc, RwLock}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, RwLock, + }, }, }; @@ -144,6 +147,7 @@ pub(crate) struct VoteBatchInsertionMetrics { #[derive(Debug, Default)] pub struct LatestUnprocessedVotes { latest_votes_per_pubkey: RwLock>>>, + num_unprocessed_votes: AtomicUsize, } impl LatestUnprocessedVotes { @@ -151,14 +155,8 @@ impl LatestUnprocessedVotes { Self::default() } - /// Expensive because this involves iterating through and locking every unprocessed vote pub fn len(&self) -> usize { - self.latest_votes_per_pubkey - .read() - .unwrap() - .values() - .filter(|lock| !lock.read().unwrap().is_vote_taken()) - .count() + self.num_unprocessed_votes.load(Ordering::Relaxed) } pub fn is_empty(&self) -> bool { @@ -220,6 +218,7 @@ impl LatestUnprocessedVotes { if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) { let old_vote = std::mem::replace(latest_vote.deref_mut(), vote); if old_vote.is_vote_taken() { + self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed); return None; } else { return Some(old_vote); @@ -233,6 +232,7 @@ impl LatestUnprocessedVotes { // and when a new vote account starts voting. let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap(); latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote))); + self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed); None } @@ -319,7 +319,10 @@ impl LatestUnprocessedVotes { .filter_map(|pubkey| { self.get_entry(pubkey).and_then(|lock| { let mut latest_vote = lock.write().unwrap(); - latest_vote.take_vote() + latest_vote.take_vote().map(|vote| { + self.num_unprocessed_votes.fetch_sub(1, Ordering::Relaxed); + vote + }) }) }) .collect_vec() @@ -335,8 +338,8 @@ impl LatestUnprocessedVotes { .filter(|lock| lock.read().unwrap().is_forwarded()) .for_each(|lock| { let mut vote = lock.write().unwrap(); - if vote.is_forwarded() { - vote.take_vote(); + if vote.is_forwarded() && vote.take_vote().is_some() { + self.num_unprocessed_votes.fetch_sub(1, Ordering::Relaxed); } }); } @@ -726,6 +729,9 @@ mod tests { let mut latest_vote = lock.write().unwrap(); if !latest_vote.is_vote_taken() { latest_vote.take_vote(); + latest_unprocessed_votes_tpu + .num_unprocessed_votes + .fetch_sub(1, Ordering::Relaxed); } }); }