Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ impl ReplayStage {
);
let mut current_leader = None;
let mut last_reset = Hash::default();
let mut last_reset_bank_descendants = Vec::new();
let mut partition_info = PartitionInfo::new();
let mut skipped_slots_info = SkippedSlotsInfo::default();
let mut replay_timing = ReplayLoopTiming::default();
Expand Down Expand Up @@ -1017,7 +1018,17 @@ impl ReplayStage {
let mut reset_bank_time = Measure::start("reset_bank");
// Reset onto a fork
if let Some(reset_bank) = reset_bank {
if last_reset != reset_bank.last_blockhash() {
if last_reset == reset_bank.last_blockhash() {
let reset_bank_descendants =
Self::get_active_descendants(reset_bank.slot(), &progress, &blockstore);
if reset_bank_descendants != last_reset_bank_descendants {
last_reset_bank_descendants = reset_bank_descendants;
poh_recorder
.write()
.unwrap()
.update_start_bank_active_descendants(&last_reset_bank_descendants);
}
} else {
info!(
"vote bank: {:?} reset bank: {:?}",
vote_bank
Expand Down Expand Up @@ -1077,6 +1088,7 @@ impl ReplayStage {
&leader_schedule_cache,
);
last_reset = reset_bank.last_blockhash();
last_reset_bank_descendants = vec![];
tpu_has_bank = false;

if let Some(last_voted_slot) = tower.last_voted_slot() {
Expand Down Expand Up @@ -1358,6 +1370,23 @@ impl ReplayStage {
.unwrap_or(true)
}

fn get_active_descendants(
slot: Slot,
progress: &ProgressMap,
blockstore: &Blockstore,
) -> Vec<Slot> {
let Some(slot_meta) = blockstore.meta(slot).ok().flatten() else {
return vec![];
};

slot_meta
.next_slots
.iter()
.filter(|slot| !progress.is_dead(**slot).unwrap_or_default())
.copied()
.collect()
}

fn initialize_progress_and_fork_choice_with_locked_bank_forks(
bank_forks: &RwLock<BankForks>,
my_pubkey: &Pubkey,
Expand Down
3 changes: 3 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ pub struct ValidatorConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub delay_leader_block_for_pending_fork: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -342,6 +343,7 @@ impl Default for ValidatorConfig {
ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
delay_leader_block_for_pending_fork: false,
}
}
}
Expand Down Expand Up @@ -942,6 +944,7 @@ impl Validator {
bank.clone(),
None,
bank.ticks_per_slot(),
config.delay_leader_block_for_pending_fork,
blockstore.clone(),
blockstore.get_new_shred_signal(0),
&leader_schedule_cache,
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
ip_echo_server_threads: config.ip_echo_server_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork,
}
}

Expand Down
118 changes: 114 additions & 4 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,9 @@ pub struct PohRecorder {
pub poh: Arc<Mutex<Poh>>,
tick_height: u64,
clear_bank_signal: Option<Sender<bool>>,
start_bank: Arc<Bank>, // parent slot
start_tick_height: u64, // first tick_height this recorder will observe
start_bank: Arc<Bank>, // parent slot
start_bank_active_descendants: Vec<Slot>,
start_tick_height: u64, // first tick_height this recorder will observe
tick_cache: Vec<(Entry, u64)>, // cache of entry and its tick_height
working_bank: Option<WorkingBank>,
sender: Sender<WorkingBankEntry>,
Expand All @@ -305,6 +306,8 @@ pub struct PohRecorder {
last_metric: Instant,
record_sender: Sender<Record>,
leader_bank_notifier: Arc<LeaderBankNotifier>,
delay_leader_block_for_pending_fork: bool,
last_reported_slot_for_pending_fork: Arc<Mutex<Slot>>,
pub is_exited: Arc<AtomicBool>,
}

Expand Down Expand Up @@ -460,6 +463,14 @@ impl PohRecorder {
}
}

// Active descendants of the last reset bank that are smaller than the
// next leader slot could soon become the new reset bank.
fn is_new_reset_bank_pending(&self, next_slot: Slot) -> bool {
self.start_bank_active_descendants
.iter()
.any(|pending_slot| *pending_slot < next_slot)
}

fn reached_leader_tick(
&self,
my_pubkey: &Pubkey,
Expand All @@ -475,8 +486,38 @@ impl PohRecorder {
|| self.start_tick_height + self.grace_ticks
== leader_first_tick_height_including_grace_ticks
|| (self.tick_height >= ideal_target_tick_height
&& (self.prev_slot_was_mine(my_pubkey, next_slot)
|| !self.is_same_fork_as_previous_leader(next_slot)))
&& (self.prev_slot_was_mine(my_pubkey, next_slot) || {
// If we are not reset to a bank by the previous leader, skip grace
// ticks unless there is a pending reset bank and we are configured
// to apply grace ticks when we detect a pending reset bank.
!self.is_same_fork_as_previous_leader(next_slot)
&& (!self.is_new_reset_bank_pending(next_slot) || {
self.report_pending_fork_was_detected(next_slot);
!self.delay_leader_block_for_pending_fork
})
}))
}

// Report metrics when poh recorder detects a pending fork that could
// soon lead to poh reset.
fn report_pending_fork_was_detected(&self, next_slot: Slot) {
// Only report once per next leader slot to avoid spamming metrics. It's
// enough to know that a leader decided to delay or not once per slot
let mut last_slot = self.last_reported_slot_for_pending_fork.lock().unwrap();
if *last_slot == next_slot {
return;
}
*last_slot = next_slot;

datapoint_info!(
"poh_recorder-detected_pending_fork",
("next_leader_slot", next_slot, i64),
(
"did_delay_leader_slot",
self.delay_leader_block_for_pending_fork,
bool
),
);
}

pub fn start_slot(&self) -> Slot {
Expand Down Expand Up @@ -566,11 +607,18 @@ impl PohRecorder {
self.tick_cache = vec![];
if reset_start_bank {
self.start_bank = reset_bank;
self.start_bank_active_descendants = vec![];
}
self.tick_height = (self.start_slot() + 1) * self.ticks_per_slot;
self.start_tick_height = self.tick_height + 1;
}

// update the list of active descendants of the start bank to make a better
// decision about whether to use grace ticks
pub fn update_start_bank_active_descendants(&mut self, active_descendants: &[Slot]) {
self.start_bank_active_descendants = active_descendants.to_vec();
}

// synchronize PoH with a bank
pub fn reset(&mut self, reset_bank: Arc<Bank>, next_leader_slot: Option<(Slot, Slot)>) {
self.clear_bank();
Expand Down Expand Up @@ -941,6 +989,7 @@ impl PohRecorder {
start_bank: Arc<Bank>,
next_leader_slot: Option<(Slot, Slot)>,
ticks_per_slot: u64,
delay_leader_block_for_pending_fork: bool,
blockstore: Arc<Blockstore>,
clear_bank_signal: Option<Sender<bool>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
Expand Down Expand Up @@ -973,6 +1022,7 @@ impl PohRecorder {
poh_timing_point_sender,
clear_bank_signal,
start_bank,
start_bank_active_descendants: vec![],
start_tick_height: tick_height + 1,
leader_first_tick_height_including_grace_ticks,
leader_last_tick_height,
Expand All @@ -993,6 +1043,8 @@ impl PohRecorder {
last_metric: Instant::now(),
record_sender,
leader_bank_notifier: Arc::default(),
delay_leader_block_for_pending_fork,
last_reported_slot_for_pending_fork: Arc::default(),
is_exited,
},
receiver,
Expand All @@ -1015,12 +1067,14 @@ impl PohRecorder {
poh_config: &PohConfig,
is_exited: Arc<AtomicBool>,
) -> (Self, Receiver<WorkingBankEntry>, Receiver<Record>) {
let delay_leader_block_for_pending_fork = false;
Self::new_with_clear_signal(
tick_height,
last_entry_hash,
start_bank,
next_leader_slot,
ticks_per_slot,
delay_leader_block_for_pending_fork,
blockstore,
None,
leader_schedule_cache,
Expand Down Expand Up @@ -1730,6 +1784,7 @@ mod tests {
bank.clone(),
None,
bank.ticks_per_slot(),
false,
Arc::new(blockstore),
Some(sender),
&Arc::new(LeaderScheduleCache::default()),
Expand Down Expand Up @@ -2029,6 +2084,61 @@ mod tests {
parent_slot: 4,
}
);

// Test that grace ticks are not required if the previous leader's 4
// slots got skipped.
{
poh_recorder.reset(bank4.clone(), Some((9, 9)));

// Tick until leader slot
for _ in 0..4 * bank4.ticks_per_slot() {
poh_recorder.tick();
}

// We are due to lead
assert_eq!(
poh_recorder.reached_leader_slot(&test_validator_pubkey),
PohLeaderStatus::Reached {
poh_slot: 9,
parent_slot: 4,
}
);

// Add an active descendant which is considered to be a pending new
// reset bank
poh_recorder.update_start_bank_active_descendants(&[5]);
assert!(poh_recorder.is_new_reset_bank_pending(8));

// Without setting delay_leader_block_for_pending_fork, skip grace ticks
assert_eq!(
poh_recorder.reached_leader_slot(&test_validator_pubkey),
PohLeaderStatus::Reached {
poh_slot: 9,
parent_slot: 4,
}
);

// After setting delay_leader_block_for_pending_fork, grace ticks are required
poh_recorder.delay_leader_block_for_pending_fork = true;
assert_eq!(
poh_recorder.reached_leader_slot(&test_validator_pubkey),
PohLeaderStatus::NotReached,
);

// Tick through grace ticks
for _ in 0..poh_recorder.grace_ticks {
poh_recorder.tick();
}

// After grace ticks, we are due to lead
assert_eq!(
poh_recorder.reached_leader_slot(&test_validator_pubkey),
PohLeaderStatus::Reached {
poh_slot: 9,
parent_slot: 4,
}
);
}
}

#[test]
Expand Down
14 changes: 14 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,20 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.takes_value(false)
.help("Disables the banking trace"),
)
.arg(
Arg::with_name("delay_leader_block_for_pending_fork")
.hidden(hidden_unless_forced())
.long("delay-leader-block-for-pending-fork")
.takes_value(false)
.help(
"Delay leader block creation while replaying a block which descends from the \
current fork and has a lower slot than our next leader slot. If we don't \
delay here, our new leader block will be on a different fork from the \
block we are replaying and there is a high chance that the cluster will \
confirm that block's fork rather than our leader block's fork because it \
was created before we started creating ours.",
),
)
.arg(
Arg::with_name("block_verification_method")
.long("block-verification-method")
Expand Down
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,8 @@ pub fn main() {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
delay_leader_block_for_pending_fork: matches
.is_present("delay_leader_block_for_pending_fork"),
..ValidatorConfig::default()
};

Expand Down