Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 108 additions & 29 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use std::{
};
use thiserror::Error;

#[derive(PartialEq, Clone, Debug)]
#[derive(PartialEq, Clone, Debug, AbiExample)]
pub enum SwitchForkDecision {
SwitchProof(Hash),
NoSwitch,
FailedSwitchThreshold,
SameFork,
FailedSwitchThreshold(u64, u64),
}

impl SwitchForkDecision {
Expand All @@ -45,8 +45,11 @@ impl SwitchForkDecision {
authorized_voter_pubkey: &Pubkey,
) -> Option<Instruction> {
match self {
SwitchForkDecision::FailedSwitchThreshold => None,
SwitchForkDecision::NoSwitch => Some(vote_instruction::vote(
SwitchForkDecision::FailedSwitchThreshold(_, total_stake) => {
assert_ne!(*total_stake, 0);
None
}
SwitchForkDecision::SameFork => Some(vote_instruction::vote(
vote_account_pubkey,
authorized_voter_pubkey,
vote,
Expand All @@ -61,6 +64,10 @@ impl SwitchForkDecision {
}
}
}

pub fn can_vote(&self) -> bool {
!matches!(self, SwitchForkDecision::FailedSwitchThreshold(_, _))
}
}

pub const VOTE_THRESHOLD_DEPTH: usize = 8;
Expand Down Expand Up @@ -101,6 +108,8 @@ pub struct Tower {
// This could be emptied after some time; but left intact indefinitely for easier
// implementation
stray_restored_slot: Option<Slot>,
#[serde(skip)]
pub last_switch_threshold_check: Option<(Slot, SwitchForkDecision)>,
}

impl Default for Tower {
Expand All @@ -115,6 +124,7 @@ impl Default for Tower {
path: PathBuf::default(),
tmp_path: PathBuf::default(),
stray_restored_slot: Option::default(),
last_switch_threshold_check: Option::default(),
};
// VoteState::root_slot is ensured to be Some in Tower
tower.lockouts.root_slot = Some(Slot::default());
Expand Down Expand Up @@ -493,7 +503,7 @@ impl Tower {
false
}

pub(crate) fn check_switch_threshold(
fn make_check_switch_threshold_decision(
&self,
switch_slot: u64,
ancestors: &HashMap<Slot, HashSet<u64>>,
Expand All @@ -520,9 +530,34 @@ impl Tower {
// all of them.
panic!("no ancestors found with slot: {}", last_voted_slot);
} else {
// bank_forks doesn't have corresponding data for the stray restored last vote,
// meaning some inconsistency between saved tower and ledger.
// (newer snapshot, or only a saved tower is moved over to new setup?)
// This condition shouldn't occur under normal validator operation, indicating
// something unusual happened.
// Possible causes include: OS/HW crash, validator process crash, only saved tower
// is moved over to a new setup, etc...

// However, returning empty ancestors as a fallback here shouldn't result in
// slashing by itself (Note that we couldn't fully preclude any kind of slashing if
// the failure was OS or HW level).

// Firstly, lockout is ensured elsewhere.

// Also, there is no risk of optimistic conf. violation. Although empty ancestors
// could result in incorrect (= more than actual) locked_out_stake and
// false-positive SwitchProof later in this function, there should be no such a
// heavier fork candidate, first of all, if the last vote (or any of its
// unavailable ancestors) were already optimistically confirmed.
// The only exception is that other validator is already violating it...
if self.is_first_switch_check() && switch_slot < last_voted_slot {
// `switch < last` is needed not to warn! this message just because of using
// newer snapshots on validator restart
let message = format!(
"bank_forks doesn't have corresponding data for the stray restored \
last vote({}), meaning some inconsistency between saved tower and ledger.",
last_voted_slot
);
warn!("{}", message);
datapoint_warn!("tower_warn", ("warn", message, String));
}
&empty_ancestors
}
});
Expand All @@ -532,7 +567,7 @@ impl Tower {
if switch_slot == last_voted_slot || switch_slot_ancestors.contains(&last_voted_slot) {
// If the `switch_slot is a descendant of the last vote,
// no switching proof is necessary
return SwitchForkDecision::NoSwitch;
return SwitchForkDecision::SameFork;
}

// Should never consider switching to an ancestor
Expand Down Expand Up @@ -598,7 +633,7 @@ impl Tower {
}

// Only count lockouts on slots that are:
// 1) Not ancestors of `last_vote`
// 1) Not ancestors of `last_vote`, meaning being on different fork
// 2) Not from before the current root as we can't determine if
// anything before the root was an ancestor of `last_vote` or not
if !last_vote_ancestors.contains(lockout_interval_start)
Expand All @@ -622,10 +657,43 @@ impl Tower {
if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD {
SwitchForkDecision::SwitchProof(switch_proof)
} else {
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(locked_out_stake, total_stake)
}
})
.unwrap_or(SwitchForkDecision::NoSwitch)
.unwrap_or(SwitchForkDecision::SameFork)
}

pub(crate) fn check_switch_threshold(
&mut self,
switch_slot: u64,
ancestors: &HashMap<Slot, HashSet<u64>>,
descendants: &HashMap<Slot, HashSet<u64>>,
progress: &ProgressMap,
total_stake: u64,
epoch_vote_accounts: &HashMap<Pubkey, (u64, Account)>,
) -> SwitchForkDecision {
let decision = self.make_check_switch_threshold_decision(
switch_slot,
ancestors,
descendants,
progress,
total_stake,
epoch_vote_accounts,
);
let new_check = Some((switch_slot, decision.clone()));
if new_check != self.last_switch_threshold_check {
trace!(
"new switch threshold check: slot {}: {:?}",
switch_slot,
decision,
);
self.last_switch_threshold_check = new_check;
}
decision
}

fn is_first_switch_check(&self) -> bool {
self.last_switch_threshold_check.is_none()
}

pub fn check_vote_stake_threshold(
Expand Down Expand Up @@ -932,9 +1000,9 @@ impl Tower {
self.lockouts = vote_state;
self.do_initialize_lockouts(root, |v| v.slot > root);
trace!(
"{} lockouts initialized to {:?}",
"Lockouts in tower for {} is initialized using bank {}",
self.node_pubkey,
self.lockouts
bank.slot(),
);
assert_eq!(
self.lockouts.node_pubkey, self.node_pubkey,
Expand Down Expand Up @@ -986,6 +1054,7 @@ impl Tower {
bincode::serialize_into(&mut file, &saved_tower)?;
// file.sync_all() hurts performance; pipeline sync-ing and submitting votes to the cluster!
}
trace!("persisted votes: {:?}", self.voted_slots());
fs::rename(&new_filename, &filename)?;
// self.path.parent().sync_all() hurts performance same as the above sync

Expand Down Expand Up @@ -1047,6 +1116,16 @@ pub enum TowerError {
FatallyInconsistent(&'static str),
}

impl TowerError {
pub fn is_file_missing(&self) -> bool {
if let TowerError::IOError(io_err) = &self {
io_err.kind() == std::io::ErrorKind::NotFound
} else {
false
}
}
}

#[frozen_abi(digest = "Gaxfwvx5MArn52mKZQgzHmDCyn5YfCuTHvp5Et3rFfpp")]
#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq, AbiExample)]
pub struct SavedTower {
Expand Down Expand Up @@ -1267,7 +1346,7 @@ pub mod test {
&ancestors,
&descendants,
&self.progress,
&tower,
tower,
);

// Make sure this slot isn't locked out or failing threshold
Expand Down Expand Up @@ -1464,11 +1543,11 @@ pub mod test {
#[test]
fn test_to_vote_instruction() {
let vote = Vote::default();
let mut decision = SwitchForkDecision::FailedSwitchThreshold;
let mut decision = SwitchForkDecision::FailedSwitchThreshold(0, 1);
assert!(decision
.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default())
.is_none());
decision = SwitchForkDecision::NoSwitch;
decision = SwitchForkDecision::SameFork;
assert_eq!(
decision.to_vote_instruction(vote.clone(), &Pubkey::default(), &Pubkey::default()),
Some(vote_instruction::vote(
Expand Down Expand Up @@ -1571,7 +1650,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::NoSwitch
SwitchForkDecision::SameFork
);

// Trying to switch to another fork at 110 should fail
Expand All @@ -1584,7 +1663,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);

// Adding another validator lockout on a descendant of last vote should
Expand All @@ -1599,7 +1678,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);

// Adding another validator lockout on an ancestor of last vote should
Expand All @@ -1614,7 +1693,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);

// Adding another validator lockout on a different fork, but the lockout
Expand All @@ -1629,7 +1708,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);

// Adding another validator lockout on a different fork, and the lockout
Expand All @@ -1646,7 +1725,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);

// Adding another validator lockout on a different fork, and the lockout
Expand Down Expand Up @@ -1697,7 +1776,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);
}

Expand Down Expand Up @@ -2365,7 +2444,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::NoSwitch
SwitchForkDecision::SameFork
);

// Trying to switch to another fork at 110 should fail
Expand All @@ -2378,7 +2457,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);

vote_simulator.simulate_lockout_interval(111, (10, 49), &other_vote_account);
Expand Down Expand Up @@ -2456,7 +2535,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);

// Add lockout_interval which should be excluded
Expand All @@ -2470,7 +2549,7 @@ pub mod test {
total_stake,
bank0.epoch_vote_accounts(0).unwrap(),
),
SwitchForkDecision::FailedSwitchThreshold
SwitchForkDecision::FailedSwitchThreshold(0, 20000)
);

// Add lockout_interval which should not be excluded
Expand Down
8 changes: 4 additions & 4 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl ReplayStage {
&ancestors,
&descendants,
&progress,
&tower,
&mut tower,
);
select_vote_and_reset_forks_time.stop();

Expand Down Expand Up @@ -1525,7 +1525,7 @@ impl ReplayStage {
ancestors: &HashMap<u64, HashSet<u64>>,
descendants: &HashMap<u64, HashSet<u64>>,
progress: &ProgressMap,
tower: &Tower,
tower: &mut Tower,
) -> SelectVoteAndResetForkResult {
// Try to vote on the actual heaviest fork. If the heaviest bank is
// locked out or fails the threshold check, the validator will:
Expand All @@ -1552,7 +1552,7 @@ impl ReplayStage {
.epoch_vote_accounts(heaviest_bank.epoch())
.expect("Bank epoch vote accounts must contain entry for the bank's own epoch"),
);
if switch_fork_decision == SwitchForkDecision::FailedSwitchThreshold {
if let SwitchForkDecision::FailedSwitchThreshold(_, _) = switch_fork_decision {
// If we can't switch, then reset to the the next votable
// bank on the same fork as our last vote, but don't vote
info!(
Expand Down Expand Up @@ -1601,7 +1601,7 @@ impl ReplayStage {
if !is_locked_out
&& vote_threshold
&& propagation_confirmed
&& switch_fork_decision != SwitchForkDecision::FailedSwitchThreshold
&& switch_fork_decision.can_vote()
{
info!("voting: {} {}", bank.slot(), fork_weight);
SelectVoteAndResetForkResult {
Expand Down
11 changes: 3 additions & 8 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
cluster_info::{ClusterInfo, Node},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, Tower, TowerError},
consensus::{reconcile_blockstore_roots_with_tower, Tower},
contact_info::ContactInfo,
gossip_service::GossipService,
optimistically_confirmed_bank_tracker::{
Expand Down Expand Up @@ -730,12 +730,7 @@ fn post_process_restored_tower(
.unwrap_or_else(|err| {
let voting_has_been_active =
active_vote_account_exists_in_bank(&bank_forks.working_bank(), &vote_account);
let saved_tower_is_missing = if let TowerError::IOError(io_err) = &err {
io_err.kind() == std::io::ErrorKind::NotFound
} else {
false
};
if !saved_tower_is_missing {
if !err.is_file_missing() {
datapoint_error!(
"tower_error",
(
Expand All @@ -753,7 +748,7 @@ fn post_process_restored_tower(
);
process::exit(1);
}
if saved_tower_is_missing && !voting_has_been_active {
if err.is_file_missing() && !voting_has_been_active {
// Currently, don't protect against spoofed snapshots with no tower at all
info!(
"Ignoring expected failed tower restore because this is the initial \
Expand Down