diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 092a23c72bded5..28d669900e6ddd 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -31,7 +31,10 @@ use { solana_transaction::versioned::VersionedTransaction, std::{ cmp, - sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, Mutex, RwLock, + }, time::Instant, }, thiserror::Error, @@ -169,7 +172,7 @@ impl PohRecorderMetrics { pub struct PohRecorder { pub(crate) poh: Arc>, - tick_height: u64, + tick_height: SharedTickHeight, clear_bank_signal: Option>, start_bank: Arc, // parent slot start_bank_active_descendants: Vec, @@ -184,7 +187,7 @@ pub struct PohRecorder { /// the `working_bank` field of this struct. shared_working_bank: SharedWorkingBank, working_bank_sender: Sender, - leader_first_tick_height: Option, + leader_first_tick_height: SharedLeaderFirstTickHeight, leader_last_tick_height: u64, // zero if none grace_ticks: u64, blockstore: Arc, @@ -263,7 +266,7 @@ impl PohRecorder { ( Self { poh, - tick_height, + tick_height: SharedTickHeight::new(tick_height), tick_cache: vec![], working_bank: None, shared_working_bank: SharedWorkingBank::empty(), @@ -272,7 +275,9 @@ impl PohRecorder { start_bank, start_bank_active_descendants: vec![], start_tick_height: tick_height + 1, - leader_first_tick_height, + leader_first_tick_height: SharedLeaderFirstTickHeight::new( + leader_first_tick_height, + ), leader_last_tick_height, grace_ticks, blockstore, @@ -302,7 +307,8 @@ impl PohRecorder { let (leader_first_tick_height, leader_last_tick_height, grace_ticks) = Self::compute_leader_slot_tick_heights(next_leader_slot, self.ticks_per_slot); self.grace_ticks = grace_ticks; - self.leader_first_tick_height = leader_first_tick_height; + self.leader_first_tick_height + .store(leader_first_tick_height); self.leader_last_tick_height = leader_last_tick_height; } @@ -333,6 +339,7 @@ impl PohRecorder { self.metrics.flush_cache_no_tick_us += flush_cache_us; flush_cache_res?; + let tick_height = self.tick_height(); // cannot change until next loop iteration. let working_bank = self .working_bank .as_mut() @@ -366,7 +373,7 @@ impl PohRecorder { hash: entry.hash, transactions, }, - self.tick_height, // `record_batches` guarantees that mixins are **not** split across ticks. + tick_height, // `record_batches` guarantees that mixins are **not** split across ticks. ), ))); self.metrics.send_entry_us += send_batches_us; @@ -404,10 +411,10 @@ impl PohRecorder { self.metrics.tick_lock_contention_us += tick_lock_contention_us; if let Some(poh_entry) = poh_entry { - self.tick_height += 1; - trace!("tick_height {}", self.tick_height); + self.tick_height.increment(); + trace!("tick_height {}", self.tick_height()); - if self.leader_first_tick_height.is_none() { + if self.leader_first_tick_height.load().is_none() { return; } @@ -417,7 +424,7 @@ impl PohRecorder { hash: poh_entry.hash, transactions: vec![], }, - self.tick_height, + self.tick_height(), )); let (_flush_res, flush_cache_and_tick_us) = measure_us!(self.flush_cache(true)); @@ -486,7 +493,8 @@ impl PohRecorder { let (leader_first_tick_height, leader_last_tick_height, grace_ticks) = Self::compute_leader_slot_tick_heights(next_leader_slot, self.ticks_per_slot); self.grace_ticks = grace_ticks; - self.leader_first_tick_height = leader_first_tick_height; + self.leader_first_tick_height + .store(leader_first_tick_height); self.leader_last_tick_height = leader_last_tick_height; datapoint_info!( @@ -519,7 +527,7 @@ impl PohRecorder { info!( "reset poh from: {},{},{} to: {},{}", poh_hash, - self.tick_height, + self.tick_height(), self.start_slot(), blockhash, reset_bank.slot() @@ -530,8 +538,9 @@ impl PohRecorder { self.start_bank = reset_bank; self.start_bank_active_descendants = vec![]; } - self.tick_height = (self.start_slot() + 1) * self.ticks_per_slot; - self.start_tick_height = self.tick_height + 1; + self.tick_height + .store((self.start_slot() + 1) * self.ticks_per_slot); + self.start_tick_height = self.tick_height() + 1; } // Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height @@ -545,10 +554,10 @@ impl PohRecorder { .working_bank .as_ref() .ok_or(PohRecorderError::MaxHeightReached)?; - if self.tick_height < working_bank.min_tick_height { + if self.tick_height() < working_bank.min_tick_height { return Err(PohRecorderError::MinHeightNotReached); } - if tick && self.tick_height == working_bank.min_tick_height { + if tick && self.tick_height() == working_bank.min_tick_height { return Err(PohRecorderError::MinHeightNotReached); } @@ -578,7 +587,7 @@ impl PohRecorder { } } } - if self.tick_height >= working_bank.max_tick_height { + if self.tick_height() >= working_bank.max_tick_height { info!( "poh_record: max_tick_height {} reached, clearing working_bank {}", working_bank.max_tick_height, @@ -605,9 +614,10 @@ impl PohRecorder { self.has_bank() || self .leader_first_tick_height + .load() .is_some_and(|leader_first_tick_height| { - self.tick_height + within_next_n_ticks >= leader_first_tick_height - && self.tick_height <= self.leader_last_tick_height + self.tick_height() + within_next_n_ticks >= leader_first_tick_height + && self.tick_height() <= self.leader_last_tick_height }) } @@ -625,7 +635,7 @@ impl PohRecorder { // bank and generally indicates what tick height has already been // reached so use the next tick height to determine which slot poh is // ticking through. - let next_tick_height = self.tick_height.saturating_add(1); + let next_tick_height = self.tick_height().saturating_add(1); self.slot_for_tick_height(next_tick_height) } @@ -661,7 +671,15 @@ impl PohRecorder { } pub fn tick_height(&self) -> u64 { - self.tick_height + self.tick_height.load() + } + + pub fn shared_tick_height(&self) -> SharedTickHeight { + self.tick_height.clone() + } + + pub fn shared_leader_first_tick_height(&self) -> SharedLeaderFirstTickHeight { + self.leader_first_tick_height.clone() } pub fn ticks_per_slot(&self) -> u64 { @@ -685,15 +703,15 @@ impl PohRecorder { pub fn reached_leader_slot(&self, my_pubkey: &Pubkey) -> PohLeaderStatus { trace!( "tick_height {}, start_tick_height {}, leader_first_tick_height {:?}, grace_ticks {}, has_bank {}", - self.tick_height, + self.tick_height(), self.start_tick_height, - self.leader_first_tick_height, + self.leader_first_tick_height.load(), self.grace_ticks, self.has_bank() ); let current_poh_slot = self.current_poh_slot(); - let Some(leader_first_tick_height) = self.leader_first_tick_height else { + let Some(leader_first_tick_height) = self.leader_first_tick_height.load() else { // No next leader slot, so no leader slot has been reached. return PohLeaderStatus::NotReached; }; @@ -730,12 +748,12 @@ impl PohRecorder { } let ideal_target_tick_height = leader_first_tick_height.saturating_sub(1); - if self.tick_height < ideal_target_tick_height { + if self.tick_height() < ideal_target_tick_height { // We haven't ticked to our leader slot yet. return false; } - if self.tick_height >= ideal_target_tick_height.saturating_add(self.grace_ticks) { + if self.tick_height() >= ideal_target_tick_height.saturating_add(self.grace_ticks) { // We have finished waiting for grace ticks. return true; } @@ -993,11 +1011,17 @@ impl SharedWorkingBank { self.0.load_full() } - fn store(&self, bank: Arc) { + // Mutable access not needed for this function. + // However we use it to guarantee only used when PohRecorder is + // write locked. + fn store(&mut self, bank: Arc) { self.0.store(Some(bank)); } - fn clear(&self) { + // Mutable access not needed for this function. + // However we use it to guarantee only used when PohRecorder is + // write locked. + fn clear(&mut self) { self.0.store(None); } @@ -1006,6 +1030,66 @@ impl SharedWorkingBank { } } +/// Wrapper around a atomic-u64 that prevents modifying outside +/// of `PohRecorder`. +#[derive(Clone)] +pub struct SharedTickHeight(Arc); + +impl SharedTickHeight { + pub fn load(&self) -> u64 { + self.0.load(Ordering::Acquire) + } + + fn new(tick_height: u64) -> Self { + Self(Arc::new(AtomicU64::new(tick_height))) + } + + // Mutable access not needed for this function. + // However we use it to guarantee only used when PohRecorder is + // write locked. + fn store(&mut self, tick_height: u64) { + self.0.store(tick_height, Ordering::Release); + } + + // Mutable access not needed for this function. + // However we use it to guarantee only used when PohRecorder is + // write locked. + fn increment(&mut self) { + self.0.fetch_add(1, Ordering::Release); + } +} + +/// Wrapper around a atomic-u64 that may be None. +// Uses the flag of u64::MAX to indicate None; this does not +// need to be observable outside of PohRecorder. +#[derive(Clone)] +pub struct SharedLeaderFirstTickHeight(Arc); +const SHARED_LEADER_FIRST_TICK_HEIGHT_NONE: u64 = u64::MAX; + +impl SharedLeaderFirstTickHeight { + pub fn load(&self) -> Option { + let v = self.0.load(Ordering::Acquire); + if v == SHARED_LEADER_FIRST_TICK_HEIGHT_NONE { + None + } else { + Some(v) + } + } + + fn new(tick_height: Option) -> Self { + let v = tick_height.unwrap_or(SHARED_LEADER_FIRST_TICK_HEIGHT_NONE); + Self(Arc::new(AtomicU64::new(v))) + } + + // Mutable access not needed for this function. + // However we use it to guarantee only used when PohRecorder is + // write locked. + fn store(&mut self, tick_height: Option) { + let v = tick_height.unwrap_or(SHARED_LEADER_FIRST_TICK_HEIGHT_NONE); + self.0.store(v, Ordering::Release); + } +} + #[cfg(test)] mod tests { use { @@ -1045,7 +1129,7 @@ mod tests { poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); assert_eq!(poh_recorder.tick_cache[0].1, 1); - assert_eq!(poh_recorder.tick_height, 1); + assert_eq!(poh_recorder.tick_height(), 1); } #[test] @@ -1072,7 +1156,7 @@ mod tests { poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); assert_eq!(poh_recorder.tick_cache[1].1, 2); - assert_eq!(poh_recorder.tick_height, 2); + assert_eq!(poh_recorder.tick_height(), 2); } #[test] @@ -1172,7 +1256,7 @@ mod tests { // all ticks are sent after height > min let tick_height_before = poh_recorder.tick_height(); poh_recorder.tick(); - assert_eq!(poh_recorder.tick_height, tick_height_before + 1); + assert_eq!(poh_recorder.tick_height(), tick_height_before + 1); assert_eq!(poh_recorder.tick_cache.len(), 0); let mut num_entries = 0; while let Ok((wbank, (_entry, _tick_height))) = entry_receiver.try_recv() { @@ -1212,12 +1296,12 @@ mod tests { poh_recorder.tick_cache.last().unwrap().1, bank.max_tick_height() + 1 ); - assert_eq!(poh_recorder.tick_height, bank.max_tick_height() + 1); + assert_eq!(poh_recorder.tick_height(), bank.max_tick_height() + 1); poh_recorder.set_bank_for_test(bank.clone()); poh_recorder.tick(); - assert_eq!(poh_recorder.tick_height, bank.max_tick_height() + 2); + assert_eq!(poh_recorder.tick_height(), bank.max_tick_height() + 2); assert!(poh_recorder.working_bank.is_none()); let mut num_entries = 0; while entry_receiver.try_recv().is_ok() { @@ -1340,7 +1424,7 @@ mod tests { // Check record succeeds on boundary condition where // poh_recorder.tick height == poh_recorder.working_bank.min_tick_height - assert_eq!(poh_recorder.tick_height, min_tick_height); + assert_eq!(poh_recorder.tick_height(), min_tick_height); let tx = test_tx(); let h1 = hash(b"hello world!"); assert!(poh_recorder @@ -1379,7 +1463,7 @@ mod tests { ); poh_recorder.set_bank_for_test(bank.clone()); - let num_ticks_to_max = bank.max_tick_height() - poh_recorder.tick_height; + let num_ticks_to_max = bank.max_tick_height() - poh_recorder.tick_height(); for _ in 0..num_ticks_to_max { poh_recorder.tick(); } @@ -1493,7 +1577,7 @@ mod tests { for _ in 0..remaining_ticks_to_min { poh_recorder.tick(); } - assert_eq!(poh_recorder.tick_height, remaining_ticks_to_min); + assert_eq!(poh_recorder.tick_height(), remaining_ticks_to_min); assert_eq!( poh_recorder.tick_cache.len(), remaining_ticks_to_min as usize @@ -1590,11 +1674,11 @@ mod tests { poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 4); - assert_eq!(poh_recorder.tick_height, 4); + assert_eq!(poh_recorder.tick_height(), 4); poh_recorder.reset(bank, Some((4, 4))); // parent slot 0 implies tick_height of 3 assert_eq!(poh_recorder.tick_cache.len(), 0); poh_recorder.tick(); - assert_eq!(poh_recorder.tick_height, DEFAULT_TICKS_PER_SLOT + 1); + assert_eq!(poh_recorder.tick_height(), DEFAULT_TICKS_PER_SLOT + 1); } #[test] @@ -1846,7 +1930,7 @@ mod tests { assert!(!poh_recorder.reached_leader_tick(&leader_b_pubkey, leader_b_start_tick)); // Tick through Leader A's remaining slots. - for _ in poh_recorder.tick_height..ticks_in_leader_slot_set { + for _ in poh_recorder.tick_height()..ticks_in_leader_slot_set { poh_recorder.tick(); }