diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 2ccf37ed7d7788..35b62f23fb74d2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -626,6 +626,7 @@ impl ReplayStage { ); let mut current_leader = None; let mut last_reset = Hash::default(); + let mut last_reset_bank_descendants = Vec::new(); let mut partition_info = PartitionInfo::new(); let mut skipped_slots_info = SkippedSlotsInfo::default(); let mut replay_timing = ReplayLoopTiming::default(); @@ -1017,7 +1018,17 @@ impl ReplayStage { let mut reset_bank_time = Measure::start("reset_bank"); // Reset onto a fork if let Some(reset_bank) = reset_bank { - if last_reset != reset_bank.last_blockhash() { + if last_reset == reset_bank.last_blockhash() { + let reset_bank_descendants = + Self::get_active_descendants(reset_bank.slot(), &progress, &blockstore); + if reset_bank_descendants != last_reset_bank_descendants { + last_reset_bank_descendants = reset_bank_descendants; + poh_recorder + .write() + .unwrap() + .update_start_bank_active_descendants(&last_reset_bank_descendants); + } + } else { info!( "vote bank: {:?} reset bank: {:?}", vote_bank @@ -1077,6 +1088,7 @@ impl ReplayStage { &leader_schedule_cache, ); last_reset = reset_bank.last_blockhash(); + last_reset_bank_descendants = vec![]; tpu_has_bank = false; if let Some(last_voted_slot) = tower.last_voted_slot() { @@ -1358,6 +1370,23 @@ impl ReplayStage { .unwrap_or(true) } + fn get_active_descendants( + slot: Slot, + progress: &ProgressMap, + blockstore: &Blockstore, + ) -> Vec { + let Some(slot_meta) = blockstore.meta(slot).ok().flatten() else { + return vec![]; + }; + + slot_meta + .next_slots + .iter() + .filter(|slot| !progress.is_dead(**slot).unwrap_or_default()) + .copied() + .collect() + } + fn initialize_progress_and_fork_choice_with_locked_bank_forks( bank_forks: &RwLock, my_pubkey: &Pubkey, diff --git a/core/src/validator.rs b/core/src/validator.rs index 52266b8f1372f9..513b207e69e948 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -272,6 +272,7 @@ pub struct ValidatorConfig { pub ip_echo_server_threads: NonZeroUsize, pub replay_forks_threads: NonZeroUsize, pub replay_transactions_threads: NonZeroUsize, + pub delay_leader_block_for_pending_fork: bool, } impl Default for ValidatorConfig { @@ -342,6 +343,7 @@ impl Default for ValidatorConfig { ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + delay_leader_block_for_pending_fork: false, } } } @@ -942,6 +944,7 @@ impl Validator { bank.clone(), None, bank.ticks_per_slot(), + config.delay_leader_block_for_pending_fork, blockstore.clone(), blockstore.get_new_shred_signal(0), &leader_schedule_cache, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 0e4ee5a9af31ff..c01106bda4d4e5 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -71,6 +71,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { ip_echo_server_threads: config.ip_echo_server_threads, replay_forks_threads: config.replay_forks_threads, replay_transactions_threads: config.replay_transactions_threads, + delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork, } } diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 8cabc193b966b1..902f24887d95dc 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -280,8 +280,9 @@ pub struct PohRecorder { pub poh: Arc>, tick_height: u64, clear_bank_signal: Option>, - start_bank: Arc, // parent slot - start_tick_height: u64, // first tick_height this recorder will observe + start_bank: Arc, // parent slot + start_bank_active_descendants: Vec, + start_tick_height: u64, // first tick_height this recorder will observe tick_cache: Vec<(Entry, u64)>, // cache of entry and its tick_height working_bank: Option, sender: Sender, @@ -305,6 +306,8 @@ pub struct PohRecorder { last_metric: Instant, record_sender: Sender, leader_bank_notifier: Arc, + delay_leader_block_for_pending_fork: bool, + last_reported_slot_for_pending_fork: Arc>, pub is_exited: Arc, } @@ -460,6 +463,14 @@ impl PohRecorder { } } + // Active descendants of the last reset bank that are smaller than the + // next leader slot could soon become the new reset bank. + fn is_new_reset_bank_pending(&self, next_slot: Slot) -> bool { + self.start_bank_active_descendants + .iter() + .any(|pending_slot| *pending_slot < next_slot) + } + fn reached_leader_tick( &self, my_pubkey: &Pubkey, @@ -475,8 +486,38 @@ impl PohRecorder { || self.start_tick_height + self.grace_ticks == leader_first_tick_height_including_grace_ticks || (self.tick_height >= ideal_target_tick_height - && (self.prev_slot_was_mine(my_pubkey, next_slot) - || !self.is_same_fork_as_previous_leader(next_slot))) + && (self.prev_slot_was_mine(my_pubkey, next_slot) || { + // If we are not reset to a bank by the previous leader, skip grace + // ticks unless there is a pending reset bank and we are configured + // to apply grace ticks when we detect a pending reset bank. + !self.is_same_fork_as_previous_leader(next_slot) + && (!self.is_new_reset_bank_pending(next_slot) || { + self.report_pending_fork_was_detected(next_slot); + !self.delay_leader_block_for_pending_fork + }) + })) + } + + // Report metrics when poh recorder detects a pending fork that could + // soon lead to poh reset. + fn report_pending_fork_was_detected(&self, next_slot: Slot) { + // Only report once per next leader slot to avoid spamming metrics. It's + // enough to know that a leader decided to delay or not once per slot + let mut last_slot = self.last_reported_slot_for_pending_fork.lock().unwrap(); + if *last_slot == next_slot { + return; + } + *last_slot = next_slot; + + datapoint_info!( + "poh_recorder-detected_pending_fork", + ("next_leader_slot", next_slot, i64), + ( + "did_delay_leader_slot", + self.delay_leader_block_for_pending_fork, + bool + ), + ); } pub fn start_slot(&self) -> Slot { @@ -566,11 +607,18 @@ impl PohRecorder { self.tick_cache = vec![]; if reset_start_bank { self.start_bank = reset_bank; + self.start_bank_active_descendants = vec![]; } self.tick_height = (self.start_slot() + 1) * self.ticks_per_slot; self.start_tick_height = self.tick_height + 1; } + // update the list of active descendants of the start bank to make a better + // decision about whether to use grace ticks + pub fn update_start_bank_active_descendants(&mut self, active_descendants: &[Slot]) { + self.start_bank_active_descendants = active_descendants.to_vec(); + } + // synchronize PoH with a bank pub fn reset(&mut self, reset_bank: Arc, next_leader_slot: Option<(Slot, Slot)>) { self.clear_bank(); @@ -941,6 +989,7 @@ impl PohRecorder { start_bank: Arc, next_leader_slot: Option<(Slot, Slot)>, ticks_per_slot: u64, + delay_leader_block_for_pending_fork: bool, blockstore: Arc, clear_bank_signal: Option>, leader_schedule_cache: &Arc, @@ -973,6 +1022,7 @@ impl PohRecorder { poh_timing_point_sender, clear_bank_signal, start_bank, + start_bank_active_descendants: vec![], start_tick_height: tick_height + 1, leader_first_tick_height_including_grace_ticks, leader_last_tick_height, @@ -993,6 +1043,8 @@ impl PohRecorder { last_metric: Instant::now(), record_sender, leader_bank_notifier: Arc::default(), + delay_leader_block_for_pending_fork, + last_reported_slot_for_pending_fork: Arc::default(), is_exited, }, receiver, @@ -1015,12 +1067,14 @@ impl PohRecorder { poh_config: &PohConfig, is_exited: Arc, ) -> (Self, Receiver, Receiver) { + let delay_leader_block_for_pending_fork = false; Self::new_with_clear_signal( tick_height, last_entry_hash, start_bank, next_leader_slot, ticks_per_slot, + delay_leader_block_for_pending_fork, blockstore, None, leader_schedule_cache, @@ -1730,6 +1784,7 @@ mod tests { bank.clone(), None, bank.ticks_per_slot(), + false, Arc::new(blockstore), Some(sender), &Arc::new(LeaderScheduleCache::default()), @@ -2029,6 +2084,61 @@ mod tests { parent_slot: 4, } ); + + // Test that grace ticks are not required if the previous leader's 4 + // slots got skipped. + { + poh_recorder.reset(bank4.clone(), Some((9, 9))); + + // Tick until leader slot + for _ in 0..4 * bank4.ticks_per_slot() { + poh_recorder.tick(); + } + + // We are due to lead + assert_eq!( + poh_recorder.reached_leader_slot(&test_validator_pubkey), + PohLeaderStatus::Reached { + poh_slot: 9, + parent_slot: 4, + } + ); + + // Add an active descendant which is considered to be a pending new + // reset bank + poh_recorder.update_start_bank_active_descendants(&[5]); + assert!(poh_recorder.is_new_reset_bank_pending(8)); + + // Without setting delay_leader_block_for_pending_fork, skip grace ticks + assert_eq!( + poh_recorder.reached_leader_slot(&test_validator_pubkey), + PohLeaderStatus::Reached { + poh_slot: 9, + parent_slot: 4, + } + ); + + // After setting delay_leader_block_for_pending_fork, grace ticks are required + poh_recorder.delay_leader_block_for_pending_fork = true; + assert_eq!( + poh_recorder.reached_leader_slot(&test_validator_pubkey), + PohLeaderStatus::NotReached, + ); + + // Tick through grace ticks + for _ in 0..poh_recorder.grace_ticks { + poh_recorder.tick(); + } + + // After grace ticks, we are due to lead + assert_eq!( + poh_recorder.reached_leader_slot(&test_validator_pubkey), + PohLeaderStatus::Reached { + poh_slot: 9, + parent_slot: 4, + } + ); + } } #[test] diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 7fc525477ef41e..0eed324a9a9d0c 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1497,6 +1497,20 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .takes_value(false) .help("Disables the banking trace"), ) + .arg( + Arg::with_name("delay_leader_block_for_pending_fork") + .hidden(hidden_unless_forced()) + .long("delay-leader-block-for-pending-fork") + .takes_value(false) + .help( + "Delay leader block creation while replaying a block which descends from the \ + current fork and has a lower slot than our next leader slot. If we don't \ + delay here, our new leader block will be on a different fork from the \ + block we are replaying and there is a high chance that the cluster will \ + confirm that block's fork rather than our leader block's fork because it \ + was created before we started creating ours.", + ), + ) .arg( Arg::with_name("block_verification_method") .long("block-verification-method") diff --git a/validator/src/main.rs b/validator/src/main.rs index d1aab8615b65d2..897fbb6e0bb9a1 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1481,6 +1481,8 @@ pub fn main() { ip_echo_server_threads, replay_forks_threads, replay_transactions_threads, + delay_leader_block_for_pending_fork: matches + .is_present("delay_leader_block_for_pending_fork"), ..ValidatorConfig::default() };