From f39d91ebeabc5b81066fd4737c130584d2aae367 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Fri, 8 Mar 2024 14:29:52 -0800 Subject: [PATCH] Pass the final result of LastVotedForkSlots aggregation to next stage and find the heaviest fork we will Gossip to others. --- wen-restart/proto/wen_restart.proto | 13 + .../src/last_voted_fork_slots_aggregate.rs | 28 +- wen-restart/src/wen_restart.rs | 590 +++++++++++++++--- 3 files changed, 552 insertions(+), 79 deletions(-) diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto index b25c2f17764bfd..98ea511a84f38c 100644 --- a/wen-restart/proto/wen_restart.proto +++ b/wen-restart/proto/wen_restart.proto @@ -20,10 +20,23 @@ message LastVotedForkSlotsRecord { message LastVotedForkSlotsAggregateRecord { map received = 1; + optional LastVotedForkSlotsAggregateFinal final_result = 2; +} + +message LastVotedForkSlotsAggregateFinal { + map slots_stake_map = 1; + uint64 total_active_stake = 2; +} + +message HeaviestFork { + uint64 slot = 1; + string bankhash = 2; + uint64 total_active_stake = 3; } message WenRestartProgress { State state = 1; optional LastVotedForkSlotsRecord my_last_voted_fork_slots = 2; optional LastVotedForkSlotsAggregateRecord last_voted_fork_slots_aggregate = 3; + optional HeaviestFork my_heaviest_fork = 4; } \ No newline at end of file diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs index 8a26c4d315f419..aecd48bc8f50f4 100644 --- a/wen-restart/src/last_voted_fork_slots_aggregate.rs +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -27,6 +27,12 @@ pub struct LastVotedForkSlotsAggregateResult { pub active_percent: f64, /* 0 ~ 100.0 */ } +#[derive(Clone, Debug, PartialEq)] +pub struct LastVotedForkSlotsFinalResult { + pub slots_stake_map: HashMap, + pub total_active_stake: u64, +} + impl LastVotedForkSlotsAggregate { pub(crate) fn new( root_slot: Slot, @@ -131,15 +137,25 @@ impl LastVotedForkSlotsAggregate { Some(record) } - pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult { - let total_stake = self.epoch_stakes.total_stake(); - let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| { + fn total_active_stake(&self) -> u64 { + self.active_peers.iter().fold(0, |sum: u64, pubkey| { sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey)) - }); - let active_percent = total_active_stake as f64 / total_stake as f64 * 100.0; + }) + } + + pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult { LastVotedForkSlotsAggregateResult { slots_to_repair: self.slots_to_repair.iter().cloned().collect(), - active_percent, + active_percent: self.total_active_stake() as f64 + / self.epoch_stakes.total_stake() as f64 + * 100.0, + } + } + + pub(crate) fn get_final_result(&self) -> LastVotedForkSlotsFinalResult { + LastVotedForkSlotsFinalResult { + slots_stake_map: self.slots_stake_map.clone(), + total_active_stake: self.total_active_stake(), } } } diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index b14b7e4e840c61..8322fd0bf844f5 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -2,10 +2,13 @@ use { crate::{ - last_voted_fork_slots_aggregate::LastVotedForkSlotsAggregate, + last_voted_fork_slots_aggregate::{ + LastVotedForkSlotsAggregate, LastVotedForkSlotsFinalResult, + }, solana::wen_restart_proto::{ - self, LastVotedForkSlotsAggregateRecord, LastVotedForkSlotsRecord, - State as RestartState, WenRestartProgress, + self, HeaviestFork, LastVotedForkSlotsAggregateFinal, + LastVotedForkSlotsAggregateRecord, LastVotedForkSlotsRecord, State as RestartState, + WenRestartProgress, }, }, anyhow::Result, @@ -37,9 +40,14 @@ use { // If >42% of the validators have this block, repair this block locally. const REPAIR_THRESHOLD: f64 = 0.42; +// When counting Heaviest Fork, only count those > active_stake - 38% * total_stake. +const HEAVIEST_FORK_THRESHOLD: f64 = 0.38; #[derive(Debug, PartialEq)] pub enum WenRestartError { + BlockNotFound(Slot), + BlockNotLinkedToExpectedParent(Slot, Slot, Slot), + ChildStakeLargerThanParent(Slot, u64, Slot, u64), Exiting, InvalidLastVoteType(VoteTransaction), MalformedLastVotedForkSlotsProtobuf(Option), @@ -50,6 +58,28 @@ pub enum WenRestartError { impl std::fmt::Display for WenRestartError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { + WenRestartError::BlockNotFound(slot) => { + write!(f, "Block should be repaired but not found: {}", slot) + } + WenRestartError::BlockNotLinkedToExpectedParent(slot, parent, expected_parent) => { + write!( + f, + "Block {} is not linked to expected parent {} but to {}", + slot, expected_parent, parent + ) + } + WenRestartError::ChildStakeLargerThanParent( + slot, + child_stake, + parent, + parent_stake, + ) => { + write!( + f, + "Block {} has more stake {} than its parent {} with stake {}", + slot, child_stake, parent, parent_stake + ) + } WenRestartError::Exiting => write!(f, "Exiting"), WenRestartError::InvalidLastVoteType(vote) => { write!(f, "Invalid last vote type: {:?}", vote) @@ -80,6 +110,11 @@ pub(crate) enum WenRestartProgressInternalState { }, LastVotedForkSlots { last_voted_fork_slots: Vec, + aggregate_final_result: Option, + }, + FindHeaviestFork { + aggregate_final_result: LastVotedForkSlotsFinalResult, + my_heaviest_fork: Option, }, Done, } @@ -107,7 +142,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( wen_restart_repair_slots: Arc>>, exit: Arc, progress: &mut WenRestartProgress, -) -> Result<()> { +) -> Result { let root_bank; { root_bank = bank_forks.read().unwrap().root_bank().clone(); @@ -131,6 +166,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( } else { progress.last_voted_fork_slots_aggregate = Some(LastVotedForkSlotsAggregateRecord { received: HashMap::new(), + final_result: None, }); } let mut cursor = solana_gossip::crds::Cursor::default(); @@ -196,7 +232,100 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( sleep(Duration::from_millis(time_left)); } } - Ok(()) + Ok(last_voted_fork_slots_aggregate.get_final_result()) +} + +pub(crate) fn find_heaviest_fork( + aggregate_final_result: LastVotedForkSlotsFinalResult, + bank_forks: Arc>, + exit: Arc, +) -> Result<(Slot, Hash)> { + // Under normal cases without duplicate blocks, we should have a single chain of + // blocks from the root to the newest block with > (67% - 5% - (100% - X%)) of stake. + // 1. If X% of the validators joined the restart (X > 80%), then there can't be two + // kids B and C of the same parent A with more than (67% - 5% - (100% - X%)) of + // the stake each. Assume B and C exists, their combined stake is more than + // 2 * (67% - 5% - (100% - X%)) = 134% - 10% - 2 * (100% - X%) = 2*X% - 76% > X%. + // Since we can only have X% of the stake, this is a contradiction. + // 2. If there are no duplicate blocks and a child got Y% of the stake, then its + // parent should at least have Y% of the stake. + // So, we can start from the latest block with > (67% - 5% - (100% - X%)) = X% - 38% + // of the stake and go up until we find the root. + // + // If somehow we find a block on slot D has Y% of the stake but its parent has less than Y%, + // that means block on D is a duplicate block and unfortunately we hold the wrong version. + // If the correct version of D got confirmed before the outage, 67% of the cluster holds + // the correct version and they should vote for it. We will see how many people have the + // correct snapshot in the final stage and those with wrong version can download snapshot. + // + // It is also a problem if we find a happy chain which doesn't chain to our root. + // + // In both cases, we should stop and output warning on the smallest abnormal block we find + // in the chain, since that is likely the root of the problem. + // Therefore, we will filter out all slots greater than local root with more than + // (active_stake_percnet - 38%) of the stake and sort them. Then for each slot, we check + // that the block we have: + // 1. Chains to the root/last block we examined + // 2. Has less stake than its parent + // Abort if any block fails above checks and return error on the block. + + // Because everything else is stopped, it's okay to grab a big lock on bank_forks. + let my_bank_forks = bank_forks.read().unwrap(); + let root_bank = my_bank_forks.root_bank().clone(); + let root_slot = root_bank.slot(); + // TODO: Should use better epoch_stakes later. + let epoch_stake = root_bank.epoch_stakes(root_bank.epoch()).unwrap(); + let total_stake = epoch_stake.total_stake(); + let stake_threshold = aggregate_final_result + .total_active_stake + .saturating_sub((HEAVIEST_FORK_THRESHOLD * total_stake as f64) as u64); + let mut slots = aggregate_final_result + .slots_stake_map + .iter() + .filter(|(slot, stake)| **slot > root_slot && **stake > stake_threshold) + .map(|(slot, _)| *slot) + .collect::>(); + slots.sort(); + let mut current_bank = root_bank; + let mut current_stake = aggregate_final_result + .slots_stake_map + .get(&root_slot) + .unwrap_or(&0); + for slot in slots { + if exit.load(Ordering::Relaxed) { + return Err(WenRestartError::Exiting.into()); + } + if let Some(new_bank) = my_bank_forks.get(slot) { + if new_bank.parent_slot() != current_bank.slot() { + return Err(WenRestartError::BlockNotLinkedToExpectedParent( + slot, + current_bank.slot(), + new_bank.parent_slot(), + ) + .into()); + } + let new_stake = aggregate_final_result + .slots_stake_map + .get(&slot) + .unwrap_or(&0); + // When we find a block with less stake than its parent, this is most likely a duplicate block + // and we have the wrong version. We don't care how much stake root has, so we don't check it. + if current_bank.slot() != root_slot && new_stake > current_stake { + return Err(WenRestartError::ChildStakeLargerThanParent( + slot, + *new_stake, + current_bank.slot(), + *current_stake, + ) + .into()); + } + current_bank = new_bank; + current_stake = new_stake; + } else { + return Err(WenRestartError::BlockNotFound(slot).into()); + } + } + Ok((current_bank.slot(), current_bank.hash())) } pub fn wait_for_wen_restart( @@ -212,29 +341,72 @@ pub fn wait_for_wen_restart( let (mut state, mut progress) = initialize(wen_restart_path, last_vote.clone(), blockstore.clone())?; loop { - match &state { + state = match state { WenRestartProgressInternalState::Init { last_voted_fork_slots, last_vote_bankhash, } => { progress.my_last_voted_fork_slots = Some(send_restart_last_voted_fork_slots( cluster_info.clone(), + &last_voted_fork_slots, + last_vote_bankhash, + )?); + WenRestartProgressInternalState::Init { last_voted_fork_slots, - *last_vote_bankhash, - )?) + last_vote_bankhash, + } } WenRestartProgressInternalState::LastVotedForkSlots { last_voted_fork_slots, - } => aggregate_restart_last_voted_fork_slots( - wen_restart_path, - wait_for_supermajority_threshold_percent, - cluster_info.clone(), - last_voted_fork_slots, - bank_forks.clone(), - wen_restart_repair_slots.clone().unwrap(), - exit.clone(), - &mut progress, - )?, + aggregate_final_result, + } => { + let final_result = match aggregate_final_result { + Some(result) => result, + None => aggregate_restart_last_voted_fork_slots( + wen_restart_path, + wait_for_supermajority_threshold_percent, + cluster_info.clone(), + &last_voted_fork_slots, + bank_forks.clone(), + wen_restart_repair_slots.clone().unwrap(), + exit.clone(), + &mut progress, + )?, + }; + WenRestartProgressInternalState::LastVotedForkSlots { + last_voted_fork_slots, + aggregate_final_result: Some(final_result), + } + } + WenRestartProgressInternalState::FindHeaviestFork { + aggregate_final_result, + my_heaviest_fork, + } => { + let heaviest_fork = match my_heaviest_fork { + Some(heaviest_fork) => heaviest_fork, + None => { + let total_active_stake = aggregate_final_result.total_active_stake; + let (slot, bankhash) = find_heaviest_fork( + aggregate_final_result.clone(), + bank_forks.clone(), + exit.clone(), + )?; + info!( + "Heaviest fork found: slot: {}, bankhash: {}", + slot, bankhash + ); + HeaviestFork { + slot, + bankhash: bankhash.to_string(), + total_active_stake, + } + } + }; + WenRestartProgressInternalState::FindHeaviestFork { + aggregate_final_result, + my_heaviest_fork: Some(heaviest_fork), + } + } WenRestartProgressInternalState::Done => return Ok(()), }; state = increment_and_write_wen_restart_records(wen_restart_path, state, &mut progress)?; @@ -254,13 +426,42 @@ pub(crate) fn increment_and_write_wen_restart_records( progress.set_state(RestartState::LastVotedForkSlots); WenRestartProgressInternalState::LastVotedForkSlots { last_voted_fork_slots, + aggregate_final_result: None, } } WenRestartProgressInternalState::LastVotedForkSlots { last_voted_fork_slots: _, + aggregate_final_result, } => { - progress.set_state(RestartState::Done); - WenRestartProgressInternalState::Done + if let Some(aggregate_final_result) = aggregate_final_result { + progress.set_state(RestartState::HeaviestFork); + if let Some(aggregate_record) = progress.last_voted_fork_slots_aggregate.as_mut() { + aggregate_record.final_result = Some(LastVotedForkSlotsAggregateFinal { + slots_stake_map: aggregate_final_result.slots_stake_map.clone(), + total_active_stake: aggregate_final_result.total_active_stake, + }); + } + WenRestartProgressInternalState::FindHeaviestFork { + aggregate_final_result, + my_heaviest_fork: None, + } + } else { + return Err( + WenRestartError::UnexpectedState(RestartState::LastVotedForkSlots).into(), + ); + } + } + WenRestartProgressInternalState::FindHeaviestFork { + aggregate_final_result: _, + my_heaviest_fork, + } => { + if let Some(my_heaviest_fork) = my_heaviest_fork { + progress.set_state(RestartState::Done); + progress.my_heaviest_fork = Some(my_heaviest_fork.clone()); + WenRestartProgressInternalState::Done + } else { + return Err(WenRestartError::UnexpectedState(RestartState::HeaviestFork).into()); + } } WenRestartProgressInternalState::Done => { return Err(WenRestartError::UnexpectedState(RestartState::Done).into()) @@ -286,8 +487,7 @@ pub(crate) fn initialize( ); let progress = WenRestartProgress { state: RestartState::Init.into(), - my_last_voted_fork_slots: None, - last_voted_fork_slots_aggregate: None, + ..Default::default() }; write_wen_restart_records(records_path, &progress)?; progress @@ -343,6 +543,17 @@ pub(crate) fn initialize( Ok(( WenRestartProgressInternalState::LastVotedForkSlots { last_voted_fork_slots: record.last_voted_fork_slots.clone(), + aggregate_final_result: progress + .last_voted_fork_slots_aggregate + .as_ref() + .and_then(|r| { + r.final_result.as_ref().map(|result| { + LastVotedForkSlotsFinalResult { + slots_stake_map: result.slots_stake_map.clone(), + total_active_stake: result.total_active_stake, + } + }) + }), }, progress, )) @@ -350,6 +561,24 @@ pub(crate) fn initialize( Err(WenRestartError::MalformedLastVotedForkSlotsProtobuf(None).into()) } } + RestartState::HeaviestFork => Ok(( + WenRestartProgressInternalState::FindHeaviestFork { + aggregate_final_result: progress + .last_voted_fork_slots_aggregate + .as_ref() + .and_then(|r| { + r.final_result + .as_ref() + .map(|result| LastVotedForkSlotsFinalResult { + slots_stake_map: result.slots_stake_map.clone(), + total_active_stake: result.total_active_stake, + }) + }) + .unwrap(), + my_heaviest_fork: progress.my_heaviest_fork.clone(), + }, + progress, + )), _ => Err(WenRestartError::UnexpectedState(progress.state()).into()), } } @@ -377,7 +606,7 @@ pub(crate) fn write_wen_restart_records( #[cfg(test)] mod tests { use { - crate::wen_restart::*, + crate::wen_restart::{tests::wen_restart_proto::LastVotedForkSlotsAggregateFinal, *}, assert_matches::assert_matches, solana_entry::entry, solana_gossip::{ @@ -415,7 +644,7 @@ mod tests { fn push_restart_last_voted_fork_slots( cluster_info: Arc, node: &LegacyContactInfo, - expected_slots_to_repair: &[Slot], + last_voted_fork_slots: &[Slot], last_vote_hash: &Hash, node_keypair: &Keypair, wallclock: u64, @@ -423,7 +652,7 @@ mod tests { let slots = RestartLastVotedForkSlots::new( *node.pubkey(), wallclock, - expected_slots_to_repair, + last_voted_fork_slots, *last_vote_hash, SHRED_VERSION, ) @@ -535,8 +764,7 @@ mod tests { let start = timestamp(); let mut progress = WenRestartProgress { state: RestartState::Init.into(), - my_last_voted_fork_slots: None, - last_voted_fork_slots_aggregate: None, + ..Default::default() }; loop { if let Ok(new_progress) = read_wen_restart_records(&wen_restart_proto_path) { @@ -552,6 +780,14 @@ mod tests { } } if timestamp().saturating_sub(start) > WAIT_FOR_THREAD_TIMEOUT { + assert_eq!( + progress.my_last_voted_fork_slots, + expected_progress.my_last_voted_fork_slots + ); + assert_eq!( + progress.last_voted_fork_slots_aggregate, + expected_progress.last_voted_fork_slots_aggregate + ); panic!( "wait_on_expected_progress_with_timeout failed to get expected progress {:?} expected {:?}", &progress, @@ -598,10 +834,18 @@ mod tests { fn insert_and_freeze_slots( bank_forks: Arc>, + first_parent: Slot, expected_slots_to_repair: Vec, ) { - let mut parent_bank = bank_forks.read().unwrap().root_bank(); - for slot in expected_slots_to_repair { + let mut parent_bank = bank_forks + .read() + .unwrap() + .get(first_parent) + .unwrap() + .clone(); + let mut slots = expected_slots_to_repair.clone(); + slots.sort(); + for slot in slots { let mut bank_forks_rw = bank_forks.write().unwrap(); bank_forks_rw.insert(Bank::new_from_parent( parent_bank.clone(), @@ -615,6 +859,7 @@ mod tests { #[test] fn test_wen_restart_normal_flow() { + solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let wen_restart_repair_slots = Some(Arc::new(RwLock::new(Vec::new()))); let test_state = wen_restart_test_init(&ledger_path); @@ -626,6 +871,12 @@ mod tests { (last_vote_slot + 1..last_vote_slot + 3).collect(); let blockstore_clone = test_state.blockstore.clone(); let bank_forks_clone = test_state.bank_forks.clone(); + // The slots this validator voted on should be frozen already. + insert_and_freeze_slots( + test_state.bank_forks.clone(), + 0, + test_state.last_voted_fork_slots.clone(), + ); let wen_restart_thread_handle = Builder::new() .name("solana-wen-restart".to_string()) .spawn(move || { @@ -645,6 +896,9 @@ mod tests { let mut rng = rand::thread_rng(); let mut expected_messages = HashMap::new(); // Skip the first 2 validators, because 0 is myself, we only need 8 more to reach > 80%. + let mut last_voted_fork_slots_from_others = test_state.last_voted_fork_slots.clone(); + last_voted_fork_slots_from_others.reverse(); + last_voted_fork_slots_from_others.append(&mut expected_slots_to_repair.clone()); for keypairs in test_state.validator_voting_keypairs.iter().skip(2) { let node_pubkey = keypairs.node_keypair.pubkey(); let node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); @@ -653,7 +907,7 @@ mod tests { push_restart_last_voted_fork_slots( test_state.cluster_info.clone(), &node, - &expected_slots_to_repair, + &last_voted_fork_slots_from_others, &last_vote_hash, &keypairs.node_keypair, now, @@ -661,7 +915,7 @@ mod tests { expected_messages.insert( node_pubkey.to_string(), LastVotedForkSlotsRecord { - last_voted_fork_slots: expected_slots_to_repair.clone(), + last_voted_fork_slots: last_voted_fork_slots_from_others.clone(), last_vote_bankhash: last_vote_hash.to_string(), shred_version: SHRED_VERSION as u32, wallclock: now, @@ -670,7 +924,11 @@ mod tests { } // Simulating successful repair of missing blocks. - insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); + insert_and_freeze_slots( + test_state.bank_forks.clone(), + last_vote_slot, + expected_slots_to_repair.clone(), + ); let _ = wen_restart_thread_handle.join(); let progress = read_wen_restart_records(&test_state.wen_restart_proto_path).unwrap(); @@ -679,6 +937,20 @@ mod tests { .as_ref() .unwrap() .wallclock; + let mut expected_slots_stake_map: HashMap = test_state + .last_voted_fork_slots + .iter() + .map(|slot| (*slot, 900)) + .collect(); + expected_slots_stake_map.extend(expected_slots_to_repair.iter().map(|slot| (*slot, 800))); + let expected_heaviest_fork_slot = last_vote_slot + 2; + let expected_heaviest_fork_bankhash = test_state + .bank_forks + .read() + .unwrap() + .get(expected_heaviest_fork_slot) + .unwrap() + .hash(); assert_eq!( progress, WenRestartProgress { @@ -690,10 +962,19 @@ mod tests { wallclock: progress_start_time, }), last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { - received: expected_messages + received: expected_messages, + final_result: Some(LastVotedForkSlotsAggregateFinal { + slots_stake_map: expected_slots_stake_map, + total_active_stake: 900, + }), + }), + my_heaviest_fork: Some(HeaviestFork { + slot: expected_heaviest_fork_slot, + bankhash: expected_heaviest_fork_bankhash.to_string(), + total_active_stake: 900 }), } - ) + ); } fn change_proto_file_readonly(wen_restart_proto_path: &PathBuf, readonly: bool) { @@ -759,8 +1040,7 @@ mod tests { assert_eq!(bankhash, last_vote_bankhash); assert_eq!(progress, WenRestartProgress { state: RestartState::Init.into(), - my_last_voted_fork_slots: None, - last_voted_fork_slots_aggregate: None, + ..Default::default() }); } ); @@ -768,8 +1048,7 @@ mod tests { &test_state.wen_restart_proto_path, &WenRestartProgress { state: RestartState::LastVotedForkSlots.into(), - my_last_voted_fork_slots: None, - last_voted_fork_slots_aggregate: None, + ..Default::default() }, ); assert_eq!( @@ -787,8 +1066,7 @@ mod tests { &test_state.wen_restart_proto_path, &WenRestartProgress { state: RestartState::WaitingForSupermajority.into(), - my_last_voted_fork_slots: None, - last_voted_fork_slots_aggregate: None, + ..Default::default() }, ); assert_eq!( @@ -810,8 +1088,7 @@ mod tests { let test_state = wen_restart_test_init(&ledger_path); let progress = wen_restart_proto::WenRestartProgress { state: RestartState::Init.into(), - my_last_voted_fork_slots: None, - last_voted_fork_slots_aggregate: None, + ..Default::default() }; let original_progress = progress.clone(); assert_eq!( @@ -841,7 +1118,9 @@ mod tests { }), last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { received: HashMap::new(), + final_result: None, }), + ..Default::default() }, ); } @@ -852,8 +1131,7 @@ mod tests { let test_state = wen_restart_test_init(&ledger_path); let progress = wen_restart_proto::WenRestartProgress { state: RestartState::Init.into(), - my_last_voted_fork_slots: None, - last_voted_fork_slots_aggregate: None, + ..Default::default() }; assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress).is_ok()); change_proto_file_readonly(&test_state.wen_restart_proto_path, true); @@ -882,13 +1160,15 @@ mod tests { }), last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { received: HashMap::new(), + final_result: None, }), + ..Default::default() }, ); } #[test] - fn test_wen_restart_aggregate_last_voted_fork_failures() { + fn test_wen_restart_aggregate_last_voted_fork_stop_and_restart() { solana_logger::setup(); let ledger_path = get_tmp_ledger_path_auto_delete!(); let test_state = wen_restart_test_init(&ledger_path); @@ -906,15 +1186,25 @@ mod tests { wallclock: start_time, }), last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { - received: HashMap::new() + received: HashMap::new(), + final_result: None, }), + ..Default::default() } ) .is_ok()); + insert_and_freeze_slots( + test_state.bank_forks.clone(), + 0, + test_state.last_voted_fork_slots.clone(), + ); let mut rng = rand::thread_rng(); let mut expected_messages = HashMap::new(); let expected_slots_to_repair: Vec = (last_vote_slot + 1..last_vote_slot + 3).collect(); + let mut last_voted_fork_slots_from_others = test_state.last_voted_fork_slots.clone(); + last_voted_fork_slots_from_others.reverse(); + last_voted_fork_slots_from_others.append(&mut expected_slots_to_repair.clone()); // Skip the first 2 validators, because 0 is myself, we need 8 so it hits 80%. assert_eq!(test_state.validator_voting_keypairs.len(), 10); let progress = WenRestartProgress { @@ -925,7 +1215,7 @@ mod tests { shred_version: SHRED_VERSION as u32, wallclock: start_time, }), - last_voted_fork_slots_aggregate: None, + ..Default::default() }; for keypairs in test_state.validator_voting_keypairs.iter().skip(2) { let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone(); @@ -957,7 +1247,7 @@ mod tests { push_restart_last_voted_fork_slots( test_state.cluster_info.clone(), &node, - &expected_slots_to_repair, + &last_voted_fork_slots_from_others, &last_vote_hash, &keypairs.node_keypair, now, @@ -965,13 +1255,12 @@ mod tests { expected_messages.insert( node_pubkey.to_string(), LastVotedForkSlotsRecord { - last_voted_fork_slots: expected_slots_to_repair.clone(), + last_voted_fork_slots: last_voted_fork_slots_from_others.clone(), last_vote_bankhash: last_vote_hash.to_string(), shred_version: SHRED_VERSION as u32, wallclock: now, }, ); - // Wait for the newly pushed message to be in written proto file. wait_on_expected_progress_with_timeout( test_state.wen_restart_proto_path.clone(), WenRestartProgress { @@ -984,7 +1273,9 @@ mod tests { }), last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { received: expected_messages.clone(), + final_result: None, }), + ..Default::default() }, ); exit.store(true, Ordering::Relaxed); @@ -992,14 +1283,18 @@ mod tests { } // Simulating successful repair of missing blocks. - insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); + insert_and_freeze_slots( + test_state.bank_forks.clone(), + last_vote_slot, + expected_slots_to_repair.clone(), + ); let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); wen_restart_test_succeed_after_failure( test_state, last_vote_bankhash, WenRestartProgress { - state: RestartState::Done.into(), + state: RestartState::LastVotedForkSlots.into(), my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { last_voted_fork_slots, last_vote_bankhash: last_vote_bankhash.to_string(), @@ -1008,7 +1303,9 @@ mod tests { }), last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { received: expected_messages, + final_result: None, }), + ..Default::default() }, ); } @@ -1020,56 +1317,203 @@ mod tests { let mut wen_restart_proto_path = my_dir.path().to_path_buf(); wen_restart_proto_path.push("wen_restart_status.proto"); let last_vote_bankhash = Hash::new_unique(); - let mut state = WenRestartProgressInternalState::Init { - last_voted_fork_slots: vec![0, 1], - last_vote_bankhash, - }; let my_last_voted_fork_slots = Some(LastVotedForkSlotsRecord { last_voted_fork_slots: vec![0, 1], last_vote_bankhash: last_vote_bankhash.to_string(), shred_version: 0, wallclock: 0, }); - let mut progress = WenRestartProgress { - state: RestartState::Init.into(), - my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), - last_voted_fork_slots_aggregate: None, - }; - for (expected_state, expected_progress) in [ + let last_voted_fork_slots_aggregate = Some(LastVotedForkSlotsAggregateRecord { + received: HashMap::new(), + final_result: Some(LastVotedForkSlotsAggregateFinal { + slots_stake_map: vec![(0, 900), (1, 800)].into_iter().collect(), + total_active_stake: 900, + }), + }); + let expected_slots_stake_map: HashMap = + vec![(0, 900), (1, 800)].into_iter().collect(); + for (entrance_state, exit_state, entrance_progress, exit_progress) in [ ( + WenRestartProgressInternalState::Init { + last_voted_fork_slots: vec![0, 1], + last_vote_bankhash, + }, WenRestartProgressInternalState::LastVotedForkSlots { last_voted_fork_slots: vec![0, 1], + aggregate_final_result: None, }, WenRestartProgress { state: RestartState::LastVotedForkSlots.into(), my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), - last_voted_fork_slots_aggregate: None, + ..Default::default() + }, + WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), + ..Default::default() }, ), ( + WenRestartProgressInternalState::LastVotedForkSlots { + last_voted_fork_slots: vec![0, 1], + aggregate_final_result: Some(LastVotedForkSlotsFinalResult { + slots_stake_map: expected_slots_stake_map.clone(), + total_active_stake: 900, + }), + }, + WenRestartProgressInternalState::FindHeaviestFork { + aggregate_final_result: LastVotedForkSlotsFinalResult { + slots_stake_map: expected_slots_stake_map.clone(), + total_active_stake: 900, + }, + my_heaviest_fork: None, + }, + WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), + last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(), + ..Default::default() + }, + WenRestartProgress { + state: RestartState::HeaviestFork.into(), + my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), + last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(), + ..Default::default() + }, + ), + ( + WenRestartProgressInternalState::FindHeaviestFork { + aggregate_final_result: LastVotedForkSlotsFinalResult { + slots_stake_map: expected_slots_stake_map, + total_active_stake: 900, + }, + my_heaviest_fork: Some(HeaviestFork { + slot: 1, + bankhash: Hash::default().to_string(), + total_active_stake: 900, + }), + }, WenRestartProgressInternalState::Done, + WenRestartProgress { + state: RestartState::HeaviestFork.into(), + my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), + last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(), + ..Default::default() + }, WenRestartProgress { state: RestartState::Done.into(), - my_last_voted_fork_slots, - last_voted_fork_slots_aggregate: None, + my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), + last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(), + my_heaviest_fork: Some(HeaviestFork { + slot: 1, + bankhash: Hash::default().to_string(), + total_active_stake: 900, + }), }, ), ] { - state = increment_and_write_wen_restart_records( + let mut progress = entrance_progress; + let state = increment_and_write_wen_restart_records( &wen_restart_proto_path, - state, + entrance_state, &mut progress, ) .unwrap(); - assert_eq!(&state, &expected_state); - assert_eq!(&progress, &expected_progress); + assert_eq!(&state, &exit_state); + assert_eq!(&progress, &exit_progress); } + let mut progress = WenRestartProgress { + state: RestartState::Done.into(), + my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), + last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(), + ..Default::default() + }; assert_eq!( - increment_and_write_wen_restart_records(&wen_restart_proto_path, state, &mut progress) - .unwrap_err() - .downcast::() - .unwrap(), + increment_and_write_wen_restart_records( + &wen_restart_proto_path, + WenRestartProgressInternalState::Done, + &mut progress + ) + .unwrap_err() + .downcast::() + .unwrap(), WenRestartError::UnexpectedState(RestartState::Done), ); } + + #[test] + fn test_find_heaviest_fork_failures() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let exit = Arc::new(AtomicBool::new(false)); + let test_state = wen_restart_test_init(&ledger_path); + let last_vote_slot = test_state.last_voted_fork_slots[0]; + let slot_with_no_block = last_vote_slot + 5; + // This fails because corresponding block is not found, which is wrong, we should have + // repaired all eligible blocks when we exit LastVotedForkSlots state. + assert_eq!( + find_heaviest_fork( + LastVotedForkSlotsFinalResult { + slots_stake_map: vec![(0, 900), (slot_with_no_block, 800)] + .into_iter() + .collect(), + total_active_stake: 900, + }, + test_state.bank_forks.clone(), + exit.clone(), + ) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::BlockNotFound(slot_with_no_block), + ); + // The slots this validator voted on should be frozen already. + insert_and_freeze_slots( + test_state.bank_forks.clone(), + 0, + test_state.last_voted_fork_slots.clone(), + ); + // The following fails because we expect to see the full chain from root to the last vote. + assert_eq!( + find_heaviest_fork( + LastVotedForkSlotsFinalResult { + slots_stake_map: vec![(last_vote_slot, 900)].into_iter().collect(), + total_active_stake: 900, + }, + test_state.bank_forks.clone(), + exit.clone(), + ) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::BlockNotLinkedToExpectedParent(last_vote_slot, 0, last_vote_slot - 1), + ); + // The following fails because a child block has more stake than its parent. + let len = test_state.last_voted_fork_slots.len(); + let parent_with_less_stake = test_state.last_voted_fork_slots[len - 1]; + let child_with_more_stake = test_state.last_voted_fork_slots[len - 2]; + assert_eq!( + find_heaviest_fork( + LastVotedForkSlotsFinalResult { + slots_stake_map: vec![ + (parent_with_less_stake, 700), + (child_with_more_stake, 800) + ] + .into_iter() + .collect(), + total_active_stake: 900, + }, + test_state.bank_forks.clone(), + exit.clone(), + ) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::ChildStakeLargerThanParent( + child_with_more_stake, + 800, + parent_with_less_stake, + 700 + ), + ); + } }