From 8070c3a09c982cec3aba372fab2a92c20f6f69f4 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 11 Jul 2024 21:31:48 +0000 Subject: [PATCH 01/11] replay: extend last fec set check for 32+ retransmitter signed shreds --- core/src/replay_stage.rs | 75 +++++---- ledger/src/blockstore.rs | 261 ++++++++++++++++++++++++++--- ledger/src/blockstore_db.rs | 2 + ledger/src/blockstore_processor.rs | 3 + ledger/src/shred.rs | 16 ++ 5 files changed, 299 insertions(+), 58 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 133fd97bb0d2dd..0390b50b9d8b3a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3077,48 +3077,51 @@ impl ReplayStage { } if bank.collector_id() != my_pubkey { - // If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK shreds in the last FEC set, - // mark it dead. No reason to perform this check on our leader block. - if !blockstore - .is_last_fec_set_full(bank.slot()) - .inspect_err(|e| { + // If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK correctly retransmitted + // shreds in the last FEC set, mark it dead. No reason to perform this check on our leader block. + let check_result = match blockstore.check_last_fec_set(bank.slot()) { + Ok(last_fec_set_check_results) => { + // Update metrics regardless of feature flag + last_fec_set_check_results.report_metrics(bank_slot, bank.hash()); + // Get a final result based on the feature flags + last_fec_set_check_results.get_result(&bank.feature_set) + } + Err(e) => { warn!( - "Unable to determine if last fec set is full for slot {} {}, + "Unable to check the last fec set for slot {} {}, marking as dead: {e:?}", bank.slot(), bank.hash() - ) - }) - .unwrap_or(false) - { - // Update metric regardless of feature flag - datapoint_warn!( - "incomplete_final_fec_set", - ("slot", bank_slot, i64), - ("hash", bank.hash().to_string(), String) - ); - if bank - .feature_set - .is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) - { - let root = bank_forks.read().unwrap().root(); - Self::mark_dead_slot( - blockstore, - bank, - root, - &BlockstoreProcessorError::IncompleteFinalFecSet, - rpc_subscriptions, - duplicate_slots_tracker, - duplicate_confirmed_slots, - epoch_slots_frozen_slots, - progress, - heaviest_subtree_fork_choice, - duplicate_slots_to_repair, - ancestor_hashes_replay_update_sender, - purge_repair_slot_counter, ); - continue; + if bank + .feature_set + .is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) + { + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + } else { + Ok(()) + } } + }; + + if let Err(result_err) = check_result { + let root = bank_forks.read().unwrap().root(); + Self::mark_dead_slot( + blockstore, + bank, + root, + &result_err, + rpc_subscriptions, + duplicate_slots_tracker, + duplicate_confirmed_slots, + epoch_slots_frozen_slots, + progress, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + ); + continue; } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 0342a323905876..e73f0d9a27aea1 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -15,6 +15,7 @@ use { AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_FIFO, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL, }, + blockstore_processor::BlockstoreProcessorError, leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, shred::{ @@ -47,6 +48,7 @@ use { account::ReadableAccount, address_lookup_table::state::AddressLookupTable, clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND}, + feature_set::FeatureSet, genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, hash::Hash, pubkey::Pubkey, @@ -170,6 +172,48 @@ impl AsRef for WorkingEntry { } } +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct LastFECSetCheckResults { + is_full: bool, + is_retransmitter_signed: bool, +} + +impl LastFECSetCheckResults { + pub fn report_metrics(&self, slot: Slot, hash: Hash) { + if !self.is_full { + datapoint_warn!( + "incomplete_final_fec_set", + ("slot", slot, i64), + ("hash", hash.to_string(), String) + ); + } + if !self.is_retransmitter_signed { + datapoint_warn!( + "invalid_retransmitter_signature_final_fec_set", + ("slot", slot, i64), + ("hash", hash.to_string(), String) + ); + } + } + + pub fn get_result( + &self, + feature_set: &FeatureSet, + ) -> std::result::Result<(), BlockstoreProcessorError> { + if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) + && !self.is_full + { + return Err(BlockstoreProcessorError::IncompleteFinalFecSet); + } else if feature_set + .is_active(&solana_sdk::feature_set::verify_retransmitter_signature::id()) + && !self.is_retransmitter_signed + { + return Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet); + } + Ok(()) + } +} + pub struct InsertResults { completed_data_set_infos: Vec, duplicate_shreds: Vec, @@ -3680,15 +3724,21 @@ impl Blockstore { self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta) } - /// Returns true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of a - /// slot have the same merkle root, indicating they are a part of the same - /// FEC set. + /// Performs checks on the last FEC set for this slot. + /// - `is_full` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of + /// `slot` have the same merkle root, indicating they are a part of the same FEC set. + /// This indicates that the last FEC set is sufficiently sized. + /// - `is_retransmitter_signed` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` + /// data shreds of `slot` are of the retransmitter variant. Since we already discard + /// invalid signatures on ingestion, this indicates that the last FEC set is properly + /// signed by retransmitters. + /// /// Will fail if: /// - Slot meta is missing /// - LAST_SHRED_IN_SLOT flag has not been received /// - There are missing shreds in the last fec set /// - The block contains legacy shreds - pub fn is_last_fec_set_full(&self, slot: Slot) -> Result { + pub fn check_last_fec_set(&self, slot: Slot) -> Result { // We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds. // We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block. // Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has @@ -3703,11 +3753,14 @@ impl Blockstore { const_assert_eq!(MINIMUM_INDEX, 31); let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else { warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1); - return Ok(false); + return Ok(LastFECSetCheckResults { + is_full: false, + is_retransmitter_signed: false, + }); }; let keys = (start_index..=last_shred_index).map(|index| (slot, index)); - let last_merkle_roots: Vec = self + let (merkle_root, is_retransmitter_signed) = self .data_shred_cf .multi_get_bytes(keys) .into_iter() @@ -3718,17 +3771,42 @@ impl Blockstore { warn!("Missing shred for {slot} index {shred_index}"); BlockstoreError::MissingShred(slot, shred_index) })?; - shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| { - let shred_index = start_index + u64::try_from(offset).unwrap(); - warn!("Found legacy shred for {slot}, index {shred_index}"); - BlockstoreError::LegacyShred(slot, shred_index) - }) + let is_retransmitter_signed = shred::layout::is_retransmitter_signed(&shred_bytes) + .map_err(|_| { + let shred_index = start_index + u64::try_from(offset).unwrap(); + warn!("Found legacy shred for {slot}, index {shred_index}"); + BlockstoreError::LegacyShred(slot, shred_index) + })?; + let merkle_root = + shred::layout::get_merkle_root(&shred_bytes).ok_or_else(|| { + let shred_index = start_index + u64::try_from(offset).unwrap(); + warn!("Unable to read merkle root for {slot}, index {shred_index}"); + BlockstoreError::MissingMerkleRoot(slot, shred_index) + })?; + Ok((merkle_root, is_retransmitter_signed)) }) - .dedup_by(|res1, res2| res1.as_ref().ok() == res2.as_ref().ok()) - .collect::>>()?; + .process_results::<_, _, BlockstoreError, _>(|mut iter| { + // lift and dedup merkle root and retransmitter individually to avoid allocation + // and still be able to report metrics accurately for each check. + let Some((merkle_root, retransmitter_signed)) = iter.next() else { + return (None, false); + }; + iter.fold( + (Some(merkle_root), retransmitter_signed), + |(mr_acc, rs_acc), (mr, rs)| { + ( + mr_acc.and_then(|mr_acc| (mr_acc == mr).then_some(mr)), + rs_acc && rs, + ) + }, + ) + })?; - // After the dedup there should be exactly one Hash left if the shreds were part of the same FEC set. - Ok(last_merkle_roots.len() == 1) + // After the dedup there should be exactly one Hash left and one true value + Ok(LastFECSetCheckResults { + is_full: merkle_root.is_some(), + is_retransmitter_signed, + }) } /// Returns a mapping from each elements of `slots` to a list of the @@ -5224,6 +5302,7 @@ pub mod tests { solana_runtime::bank::{Bank, RewardType}, solana_sdk::{ clock::{DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT}, + feature_set::{verify_retransmitter_signature, vote_only_full_fec_sets}, hash::{self, hash, Hash}, instruction::CompiledInstruction, message::v0::LoadedAddresses, @@ -11870,7 +11949,7 @@ pub mod tests { } #[test] - fn test_is_last_fec_set_full() { + fn test_check_last_fec_set() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -11887,7 +11966,7 @@ pub mod tests { // Missing slot meta assert_matches!( - blockstore.is_last_fec_set_full(0), + blockstore.check_last_fec_set(0), Err(BlockstoreError::SlotUnavailable) ); @@ -11902,7 +11981,7 @@ pub mod tests { let meta = blockstore.meta(slot).unwrap().unwrap(); assert!(meta.last_index.is_none()); assert_matches!( - blockstore.is_last_fec_set_full(slot), + blockstore.check_last_fec_set(slot), Err(BlockstoreError::UnknownLastIndex(_)) ); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -11914,14 +11993,16 @@ pub mod tests { let meta = blockstore.meta(slot).unwrap().unwrap(); assert_eq!(meta.last_index, Some(total_shreds - 1)); assert_matches!( - blockstore.is_last_fec_set_full(slot), + blockstore.check_last_fec_set(slot), Err(BlockstoreError::MissingShred(_, _)) ); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Full slot blockstore.insert_shreds(data_shreds, None, false).unwrap(); - assert!(blockstore.is_last_fec_set_full(slot).unwrap()); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert!(results.is_full); + assert!(results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Slot has less than DATA_SHREDS_PER_FEC_BLOCK shreds in total @@ -11959,7 +12040,9 @@ pub mod tests { let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); - assert!(!blockstore.is_last_fec_set_full(slot).unwrap()); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert!(!results.is_full); + assert!(!results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Slot has more than DATA_SHREDS_PER_FEC_BLOCK in total, but last FEC set has less @@ -11998,6 +12081,140 @@ pub mod tests { let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); - assert!(!blockstore.is_last_fec_set_full(slot).unwrap()); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert!(!results.is_full); + assert!(!results.is_retransmitter_signed); + blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); + + // Slot contains retransmitter signed shreds, but they are part of 2 different FEC sets + let mut fec_set_index = 0; + let (first_data_shreds, _, _) = + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + 100, + fec_set_index, + // Set merkle root here to make sure it is a resigned shred + Some(Hash::new_from_array(rand::thread_rng().gen())), + true, + ); + let merkle_root = first_data_shreds[0].merkle_root().unwrap(); + fec_set_index += first_data_shreds.len() as u32; + let (last_data_shreds, _, _) = + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + 100, + fec_set_index, + Some(merkle_root), + true, + ); + let last_index = last_data_shreds.last().unwrap().index(); + blockstore + .insert_shreds(first_data_shreds, None, false) + .unwrap(); + // This should fail for the double last_shred_in_slot flag, so we need to pass `is_trusted : true` + blockstore + .insert_shreds(last_data_shreds, None, true) + .unwrap(); + // Manually update last index flag such that the check uses shreds from both fec sets + let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); + slot_meta.last_index = Some((last_index - 3) as u64); + blockstore.put_meta(slot, &slot_meta).unwrap(); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert!(!results.is_full); + assert!(results.is_retransmitter_signed); + blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); + + // Slot is full, but does not contain retransmitter shreds + let fec_set_index = 0; + let (first_data_shreds, _, _) = + setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( + slot, + parent_slot, + 200, + fec_set_index, + // Do not set merkle root, so shreds are not signed + None, + true, + ); + assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK); + blockstore + .insert_shreds(first_data_shreds, None, false) + .unwrap(); + let results = blockstore.check_last_fec_set(slot).unwrap(); + assert!(results.is_full); + assert!(!results.is_retransmitter_signed); + } + + #[test] + fn test_last_fec_set_check_results() { + let enabled_feature_set = FeatureSet::all_enabled(); + let disabled_feature_set = FeatureSet::default(); + let mut full_only = FeatureSet::default(); + full_only.activate(&vote_only_full_fec_sets::id(), 0); + let mut retransmitter_only = FeatureSet::default(); + retransmitter_only.activate(&verify_retransmitter_signature::id(), 0); + + let results = LastFECSetCheckResults { + is_full: false, + is_retransmitter_signed: false, + }; + assert_matches!( + results.get_result(&enabled_feature_set), + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + ); + assert_matches!( + results.get_result(&full_only), + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + ); + assert_matches!( + results.get_result(&retransmitter_only), + Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) + ); + results.get_result(&disabled_feature_set).unwrap(); + + let results = LastFECSetCheckResults { + is_full: true, + is_retransmitter_signed: false, + }; + assert_matches!( + results.get_result(&enabled_feature_set), + Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) + ); + results.get_result(&full_only).unwrap(); + assert_matches!( + results.get_result(&retransmitter_only), + Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) + ); + results.get_result(&disabled_feature_set).unwrap(); + + let results = LastFECSetCheckResults { + is_full: false, + is_retransmitter_signed: true, + }; + assert_matches!( + results.get_result(&enabled_feature_set), + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + ); + assert_matches!( + results.get_result(&full_only), + Err(BlockstoreProcessorError::IncompleteFinalFecSet) + ); + results.get_result(&retransmitter_only).unwrap(); + results.get_result(&disabled_feature_set).unwrap(); + + let results = LastFECSetCheckResults { + is_full: true, + is_retransmitter_signed: true, + }; + for feature_set in [ + enabled_feature_set, + disabled_feature_set, + full_only, + retransmitter_only, + ] { + results.get_result(&feature_set).unwrap(); + } } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index dca8d9b524c20e..00eea6f811ebcb 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -159,6 +159,8 @@ pub enum BlockstoreError { MissingShred(Slot, u64), #[error("legacy shred slot {0}, index {1}")] LegacyShred(Slot, u64), + #[error("unable to read merkle root slot {0}, index {1}")] + MissingMerkleRoot(Slot, u64), } pub type Result = std::result::Result; diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9234809b245e57..f5be5afd0b145c 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -768,6 +768,9 @@ pub enum BlockstoreProcessorError { #[error("incomplete final fec set")] IncompleteFinalFecSet, + + #[error("invalid retransmitter signature final fec set")] + InvalidRetransmitterSignatureFinalFecSet, } /// Callback for accessing bank state after each slot is confirmed while diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index f2b7a84ed1e2ca..e6d18ef16ae7ea 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -786,6 +786,22 @@ pub mod layout { .ok_or(Error::InvalidPayloadSize(shred.len())) } + pub fn is_retransmitter_signed(shred: &[u8]) -> Result { + match get_shred_variant(shred)? { + ShredVariant::LegacyCode | ShredVariant::LegacyData => Err(Error::InvalidShredVariant), + ShredVariant::MerkleCode { + proof_size: _, + chained: _, + resigned, + } => Ok(resigned), + ShredVariant::MerkleData { + proof_size: _, + chained: _, + resigned, + } => Ok(resigned), + } + } + pub(crate) fn set_retransmitter_signature( shred: &mut [u8], signature: &Signature, From 498a1314be1cc77017d3eb1d4a2d720bd285a27e Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 16 Jul 2024 17:59:20 +0000 Subject: [PATCH 02/11] pr feedback: use separate feature flag --- ledger/src/blockstore.rs | 6 +++--- sdk/src/feature_set.rs | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e73f0d9a27aea1..7521fcc2fef385 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -205,7 +205,7 @@ impl LastFECSetCheckResults { { return Err(BlockstoreProcessorError::IncompleteFinalFecSet); } else if feature_set - .is_active(&solana_sdk::feature_set::verify_retransmitter_signature::id()) + .is_active(&solana_sdk::feature_set::vote_only_retransmitter_signed_fec_sets::id()) && !self.is_retransmitter_signed { return Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet); @@ -5302,7 +5302,7 @@ pub mod tests { solana_runtime::bank::{Bank, RewardType}, solana_sdk::{ clock::{DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT}, - feature_set::{verify_retransmitter_signature, vote_only_full_fec_sets}, + feature_set::{vote_only_full_fec_sets, vote_only_retransmitter_signed_fec_sets}, hash::{self, hash, Hash}, instruction::CompiledInstruction, message::v0::LoadedAddresses, @@ -12154,7 +12154,7 @@ pub mod tests { let mut full_only = FeatureSet::default(); full_only.activate(&vote_only_full_fec_sets::id(), 0); let mut retransmitter_only = FeatureSet::default(); - retransmitter_only.activate(&verify_retransmitter_signature::id(), 0); + retransmitter_only.activate(&vote_only_retransmitter_signed_fec_sets::id(), 0); let results = LastFECSetCheckResults { is_full: false, diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 4e5100dc3473f8..3ff450ac20b3e1 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -841,6 +841,10 @@ pub mod ed25519_precompile_verify_strict { solana_sdk::declare_id!("ed9tNscbWLYBooxWA7FE2B5KHWs8A6sxfY8EzezEcoo"); } +pub mod vote_only_retransmitter_signed_fec_sets { + solana_sdk::declare_id!("RfEcA95xnhuwooVAhUUksEJLZBF7xKCLuqrJoqk4Zph"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -1046,6 +1050,7 @@ lazy_static! { (verify_retransmitter_signature::id(), "Verify retransmitter signature #1840"), (move_stake_and_move_lamports_ixs::id(), "Enable MoveStake and MoveLamports stake program instructions #1610"), (ed25519_precompile_verify_strict::id(), "Use strict verification in ed25519 precompile SIMD-0152"), + (vote_only_retransmitter_signed_fec_sets::id(), "vote only on retransmitter signed fec sets"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter() From 3c2a3a5d7868a610bd8c57447e65fd0177eac52d Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 16 Jul 2024 18:03:18 +0000 Subject: [PATCH 03/11] pr feedback: is_retransmitter_signed -> is_retransmitter_signed_variant, false for legacy --- ledger/src/blockstore.rs | 4 ++-- ledger/src/shred.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 7521fcc2fef385..b1253688203856 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3771,8 +3771,8 @@ impl Blockstore { warn!("Missing shred for {slot} index {shred_index}"); BlockstoreError::MissingShred(slot, shred_index) })?; - let is_retransmitter_signed = shred::layout::is_retransmitter_signed(&shred_bytes) - .map_err(|_| { + let is_retransmitter_signed = + shred::layout::is_retransmitter_signed_variant(&shred_bytes).map_err(|_| { let shred_index = start_index + u64::try_from(offset).unwrap(); warn!("Found legacy shred for {slot}, index {shred_index}"); BlockstoreError::LegacyShred(slot, shred_index) diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index e6d18ef16ae7ea..814ec2b5bf303a 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -786,9 +786,9 @@ pub mod layout { .ok_or(Error::InvalidPayloadSize(shred.len())) } - pub fn is_retransmitter_signed(shred: &[u8]) -> Result { + pub fn is_retransmitter_signed_variant(shred: &[u8]) -> Result { match get_shred_variant(shred)? { - ShredVariant::LegacyCode | ShredVariant::LegacyData => Err(Error::InvalidShredVariant), + ShredVariant::LegacyCode | ShredVariant::LegacyData => Ok(false), ShredVariant::MerkleCode { proof_size: _, chained: _, From 9a3445a0bf7c289e0202f19ed0c89f6b58bbe5b0 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 16 Jul 2024 18:04:10 +0000 Subject: [PATCH 04/11] pr feedback: update doc comment fail -> error --- ledger/src/blockstore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index b1253688203856..96a880a776b6aa 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3733,7 +3733,7 @@ impl Blockstore { /// invalid signatures on ingestion, this indicates that the last FEC set is properly /// signed by retransmitters. /// - /// Will fail if: + /// Will error if: /// - Slot meta is missing /// - LAST_SHRED_IN_SLOT flag has not been received /// - There are missing shreds in the last fec set From f0935ae43509574bd2bdec06de912f2fa242ef93 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 16 Jul 2024 18:06:13 +0000 Subject: [PATCH 05/11] pr feedback: hash -> bank_hash for report metrics --- ledger/src/blockstore.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 96a880a776b6aa..aeeee913aabafd 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -179,19 +179,19 @@ pub struct LastFECSetCheckResults { } impl LastFECSetCheckResults { - pub fn report_metrics(&self, slot: Slot, hash: Hash) { + pub fn report_metrics(&self, slot: Slot, bank_hash: Hash) { if !self.is_full { datapoint_warn!( "incomplete_final_fec_set", ("slot", slot, i64), - ("hash", hash.to_string(), String) + ("bank_hash", bank_hash.to_string(), String) ); } if !self.is_retransmitter_signed { datapoint_warn!( "invalid_retransmitter_signature_final_fec_set", ("slot", slot, i64), - ("hash", hash.to_string(), String) + ("bank_hash", bank_hash.to_string(), String) ); } } From 2c11c743c0b0e82926912698a81ae876ed87a00f Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 16 Jul 2024 18:34:18 +0000 Subject: [PATCH 06/11] refactor metrics inside blockstore fn, return block_id for future use --- core/src/replay_stage.rs | 70 ++++++++++--------------- ledger/src/blockstore.rs | 109 +++++++++++++++++++++++++++------------ 2 files changed, 103 insertions(+), 76 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 0390b50b9d8b3a..bd0c3ad54cdd00 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3076,54 +3076,38 @@ impl ReplayStage { } } - if bank.collector_id() != my_pubkey { + let _block_id = if bank.collector_id() != my_pubkey { // If the block does not have at least DATA_SHREDS_PER_FEC_BLOCK correctly retransmitted // shreds in the last FEC set, mark it dead. No reason to perform this check on our leader block. - let check_result = match blockstore.check_last_fec_set(bank.slot()) { - Ok(last_fec_set_check_results) => { - // Update metrics regardless of feature flag - last_fec_set_check_results.report_metrics(bank_slot, bank.hash()); - // Get a final result based on the feature flags - last_fec_set_check_results.get_result(&bank.feature_set) - } - Err(e) => { - warn!( - "Unable to check the last fec set for slot {} {}, - marking as dead: {e:?}", - bank.slot(), - bank.hash() + match blockstore.check_last_fec_set_and_get_block_id( + bank.slot(), + bank.hash(), + &bank.feature_set, + ) { + Ok(block_id) => block_id, + Err(result_err) => { + let root = bank_forks.read().unwrap().root(); + Self::mark_dead_slot( + blockstore, + bank, + root, + &result_err, + rpc_subscriptions, + duplicate_slots_tracker, + duplicate_confirmed_slots, + epoch_slots_frozen_slots, + progress, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, ); - if bank - .feature_set - .is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) - { - Err(BlockstoreProcessorError::IncompleteFinalFecSet) - } else { - Ok(()) - } + continue; } - }; - - if let Err(result_err) = check_result { - let root = bank_forks.read().unwrap().root(); - Self::mark_dead_slot( - blockstore, - bank, - root, - &result_err, - rpc_subscriptions, - duplicate_slots_tracker, - duplicate_confirmed_slots, - epoch_slots_frozen_slots, - progress, - heaviest_subtree_fork_choice, - duplicate_slots_to_repair, - ancestor_hashes_replay_update_sender, - purge_repair_slot_counter, - ); - continue; } - } + } else { + None + }; let r_replay_stats = replay_stats.read().unwrap(); let replay_progress = bank_progress.replay_progress.clone(); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index aeeee913aabafd..13340fee9bda31 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -174,13 +174,13 @@ impl AsRef for WorkingEntry { #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct LastFECSetCheckResults { - is_full: bool, + block_id: Option, is_retransmitter_signed: bool, } impl LastFECSetCheckResults { pub fn report_metrics(&self, slot: Slot, bank_hash: Hash) { - if !self.is_full { + if self.block_id.is_none() { datapoint_warn!( "incomplete_final_fec_set", ("slot", slot, i64), @@ -196,12 +196,12 @@ impl LastFECSetCheckResults { } } - pub fn get_result( + pub fn get_block_id( &self, feature_set: &FeatureSet, - ) -> std::result::Result<(), BlockstoreProcessorError> { + ) -> std::result::Result, BlockstoreProcessorError> { if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) - && !self.is_full + && self.block_id.is_none() { return Err(BlockstoreProcessorError::IncompleteFinalFecSet); } else if feature_set @@ -210,7 +210,7 @@ impl LastFECSetCheckResults { { return Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet); } - Ok(()) + Ok(self.block_id) } } @@ -3724,9 +3724,39 @@ impl Blockstore { self.get_slot_entries_in_block(slot, vec![(start_index, end_index)], slot_meta) } + /// Performs checks on the last fec set of a replayed slot, and returns the block_id. + /// Returns: + /// - BlockstoreProcessorError::IncompleteFinalFecSet + /// if the last fec set is not full + /// - BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet + /// if the last fec set is not signed by retransmitters + pub fn check_last_fec_set_and_get_block_id( + &self, + slot: Slot, + bank_hash: Hash, + feature_set: &FeatureSet, + ) -> std::result::Result, BlockstoreProcessorError> { + let results = self.check_last_fec_set(slot); + let Ok(results) = results else { + warn!( + "Unable to check the last fec set for slot {} {}, + marking as dead: {results:?}", + slot, bank_hash, + ); + if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) { + return Err(BlockstoreProcessorError::IncompleteFinalFecSet); + } + return Ok(None); + }; + // Update metrics + results.report_metrics(slot, bank_hash); + // Return block id / error based on feature flags + results.get_block_id(feature_set) + } + /// Performs checks on the last FEC set for this slot. - /// - `is_full` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of - /// `slot` have the same merkle root, indicating they are a part of the same FEC set. + /// - `block_id` will be `Some(mr)` if the last `DATA_SHREDS_PER_FEC_BLOCK` data shreds of + /// `slot` have the same merkle root of `mr`, indicating they are a part of the same FEC set. /// This indicates that the last FEC set is sufficiently sized. /// - `is_retransmitter_signed` will be true if the last `DATA_SHREDS_PER_FEC_BLOCK` /// data shreds of `slot` are of the retransmitter variant. Since we already discard @@ -3738,7 +3768,7 @@ impl Blockstore { /// - LAST_SHRED_IN_SLOT flag has not been received /// - There are missing shreds in the last fec set /// - The block contains legacy shreds - pub fn check_last_fec_set(&self, slot: Slot) -> Result { + fn check_last_fec_set(&self, slot: Slot) -> Result { // We need to check if the last FEC set index contains at least `DATA_SHREDS_PER_FEC_BLOCK` data shreds. // We compare the merkle roots of the last `DATA_SHREDS_PER_FEC_BLOCK` shreds in this block. // Since the merkle root contains the fec_set_index, if all of them match, we know that the last fec set has @@ -3754,7 +3784,7 @@ impl Blockstore { let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else { warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1); return Ok(LastFECSetCheckResults { - is_full: false, + block_id: None, is_retransmitter_signed: false, }); }; @@ -3804,7 +3834,7 @@ impl Blockstore { // After the dedup there should be exactly one Hash left and one true value Ok(LastFECSetCheckResults { - is_full: merkle_root.is_some(), + block_id: merkle_root, is_retransmitter_signed, }) } @@ -11999,9 +12029,10 @@ pub mod tests { blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); // Full slot + let block_id = data_shreds[0].merkle_root().unwrap(); blockstore.insert_shreds(data_shreds, None, false).unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert!(results.is_full); + assert_eq!(results.block_id, Some(block_id)); assert!(results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -12041,7 +12072,7 @@ pub mod tests { slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert!(!results.is_full); + assert!(results.block_id.is_none()); assert!(!results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -12082,7 +12113,7 @@ pub mod tests { slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert!(!results.is_full); + assert!(results.block_id.is_none()); assert!(!results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -12122,7 +12153,7 @@ pub mod tests { slot_meta.last_index = Some((last_index - 3) as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert!(!results.is_full); + assert!(results.block_id.is_none()); assert!(results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -12139,11 +12170,12 @@ pub mod tests { true, ); assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK); + let block_id = first_data_shreds[0].merkle_root().unwrap(); blockstore .insert_shreds(first_data_shreds, None, false) .unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert!(results.is_full); + assert_eq!(results.block_id, Some(block_id)); assert!(!results.is_retransmitter_signed); } @@ -12157,55 +12189,66 @@ pub mod tests { retransmitter_only.activate(&vote_only_retransmitter_signed_fec_sets::id(), 0); let results = LastFECSetCheckResults { - is_full: false, + block_id: None, is_retransmitter_signed: false, }; assert_matches!( - results.get_result(&enabled_feature_set), + results.get_block_id(&enabled_feature_set), Err(BlockstoreProcessorError::IncompleteFinalFecSet) ); assert_matches!( - results.get_result(&full_only), + results.get_block_id(&full_only), Err(BlockstoreProcessorError::IncompleteFinalFecSet) ); assert_matches!( - results.get_result(&retransmitter_only), + results.get_block_id(&retransmitter_only), Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) ); - results.get_result(&disabled_feature_set).unwrap(); + assert!(results + .get_block_id(&disabled_feature_set) + .unwrap() + .is_none()); + let block_id = Hash::new_unique(); let results = LastFECSetCheckResults { - is_full: true, + block_id: Some(block_id), is_retransmitter_signed: false, }; assert_matches!( - results.get_result(&enabled_feature_set), + results.get_block_id(&enabled_feature_set), Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) ); - results.get_result(&full_only).unwrap(); + assert_eq!(results.get_block_id(&full_only).unwrap(), Some(block_id)); assert_matches!( - results.get_result(&retransmitter_only), + results.get_block_id(&retransmitter_only), Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) ); - results.get_result(&disabled_feature_set).unwrap(); + assert_eq!( + results.get_block_id(&disabled_feature_set).unwrap(), + Some(block_id) + ); let results = LastFECSetCheckResults { - is_full: false, + block_id: None, is_retransmitter_signed: true, }; assert_matches!( - results.get_result(&enabled_feature_set), + results.get_block_id(&enabled_feature_set), Err(BlockstoreProcessorError::IncompleteFinalFecSet) ); assert_matches!( - results.get_result(&full_only), + results.get_block_id(&full_only), Err(BlockstoreProcessorError::IncompleteFinalFecSet) ); - results.get_result(&retransmitter_only).unwrap(); - results.get_result(&disabled_feature_set).unwrap(); + assert!(results.get_block_id(&retransmitter_only).unwrap().is_none()); + assert!(results + .get_block_id(&disabled_feature_set) + .unwrap() + .is_none()); + let block_id = Hash::new_unique(); let results = LastFECSetCheckResults { - is_full: true, + block_id: Some(block_id), is_retransmitter_signed: true, }; for feature_set in [ @@ -12214,7 +12257,7 @@ pub mod tests { full_only, retransmitter_only, ] { - results.get_result(&feature_set).unwrap(); + assert_eq!(results.get_block_id(&feature_set).unwrap(), Some(block_id)); } } } From 4ecfbba0368c3bb683508d6ae780a4324685d92c Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 16 Jul 2024 19:26:45 +0000 Subject: [PATCH 07/11] pr feedback: gate metrics reporting --- core/src/replay_stage.rs | 1 + ledger/src/blockstore.rs | 13 +++++++++---- ledger/src/shred.rs | 6 +++++- .../src/broadcast_stage/standard_broadcast_run.rs | 9 ++++----- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index bd0c3ad54cdd00..3aadd7606a6d0c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3082,6 +3082,7 @@ impl ReplayStage { match blockstore.check_last_fec_set_and_get_block_id( bank.slot(), bank.hash(), + bank.cluster_type(), &bank.feature_set, ) { Ok(block_id) => block_id, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 13340fee9bda31..588f9991f04b2c 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -49,7 +49,9 @@ use { address_lookup_table::state::AddressLookupTable, clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND}, feature_set::FeatureSet, - genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, + genesis_config::{ + ClusterType, GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE, + }, hash::Hash, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, @@ -179,7 +181,7 @@ pub struct LastFECSetCheckResults { } impl LastFECSetCheckResults { - pub fn report_metrics(&self, slot: Slot, bank_hash: Hash) { + pub fn report_metrics(&self, slot: Slot, bank_hash: Hash, cluster_type: ClusterType) { if self.block_id.is_none() { datapoint_warn!( "incomplete_final_fec_set", @@ -187,7 +189,9 @@ impl LastFECSetCheckResults { ("bank_hash", bank_hash.to_string(), String) ); } - if !self.is_retransmitter_signed { + // These metrics are expensive to send because hash does not compress well. + // Only send these metrics when we are sure the appropriate shred format is being sent + if !self.is_retransmitter_signed && shred::should_chain_merkle_shreds(slot, cluster_type) { datapoint_warn!( "invalid_retransmitter_signature_final_fec_set", ("slot", slot, i64), @@ -3734,6 +3738,7 @@ impl Blockstore { &self, slot: Slot, bank_hash: Hash, + cluster_type: ClusterType, feature_set: &FeatureSet, ) -> std::result::Result, BlockstoreProcessorError> { let results = self.check_last_fec_set(slot); @@ -3749,7 +3754,7 @@ impl Blockstore { return Ok(None); }; // Update metrics - results.report_metrics(slot, bank_hash); + results.report_metrics(slot, bank_hash, cluster_type); // Return block id / error based on feature flags results.get_block_id(feature_set) } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 814ec2b5bf303a..3086cd7894eae7 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -341,7 +341,7 @@ macro_rules! dispatch { } } -use dispatch; +use {dispatch, solana_sdk::genesis_config::ClusterType}; impl Shred { dispatch!(fn common_header(&self) -> &ShredCommonHeader); @@ -1283,6 +1283,10 @@ pub fn verify_test_data_shred( } } +pub fn should_chain_merkle_shreds(_slot: Slot, cluster_type: ClusterType) -> bool { + cluster_type == ClusterType::Development +} + #[cfg(test)] mod tests { use { diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index f108fc08226a6b..e4ec736f184da0 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -9,7 +9,10 @@ use { solana_entry::entry::Entry, solana_ledger::{ blockstore, - shred::{shred_code, ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder}, + shred::{ + should_chain_merkle_shreds, shred_code, ProcessShredsStats, ReedSolomonCache, Shred, + ShredFlags, Shredder, + }, }, solana_sdk::{ genesis_config::ClusterType, @@ -506,10 +509,6 @@ impl BroadcastRun for StandardBroadcastRun { } } -fn should_chain_merkle_shreds(_slot: Slot, cluster_type: ClusterType) -> bool { - cluster_type == ClusterType::Development -} - #[cfg(test)] mod test { use { From 9916e60f1ef77a6b66bf7b1911ee54aa8b9d8f2a Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 17 Jul 2024 19:50:33 +0000 Subject: [PATCH 08/11] pr feedback: do not distinguish impossible combos, simplify check code --- ledger/src/blockstore.rs | 68 ++++++---------------------------------- 1 file changed, 10 insertions(+), 58 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 588f9991f04b2c..13c79971910024 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3795,7 +3795,7 @@ impl Blockstore { }; let keys = (start_index..=last_shred_index).map(|index| (slot, index)); - let (merkle_root, is_retransmitter_signed) = self + let deduped_shred_checks: Vec<(Hash, bool)> = self .data_shred_cf .multi_get_bytes(keys) .into_iter() @@ -3820,26 +3820,18 @@ impl Blockstore { })?; Ok((merkle_root, is_retransmitter_signed)) }) - .process_results::<_, _, BlockstoreError, _>(|mut iter| { - // lift and dedup merkle root and retransmitter individually to avoid allocation - // and still be able to report metrics accurately for each check. - let Some((merkle_root, retransmitter_signed)) = iter.next() else { - return (None, false); - }; - iter.fold( - (Some(merkle_root), retransmitter_signed), - |(mr_acc, rs_acc), (mr, rs)| { - ( - mr_acc.and_then(|mr_acc| (mr_acc == mr).then_some(mr)), - rs_acc && rs, - ) - }, - ) - })?; + .dedup_by(|res1, res2| res1.as_ref().ok() == res2.as_ref().ok()) + .collect::>>()?; // After the dedup there should be exactly one Hash left and one true value + let &[(block_id, is_retransmitter_signed)] = deduped_shred_checks.as_slice() else { + return Ok(LastFECSetCheckResults { + block_id: None, + is_retransmitter_signed: false, + }); + }; Ok(LastFECSetCheckResults { - block_id: merkle_root, + block_id: Some(block_id), is_retransmitter_signed, }) } @@ -12122,46 +12114,6 @@ pub mod tests { assert!(!results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); - // Slot contains retransmitter signed shreds, but they are part of 2 different FEC sets - let mut fec_set_index = 0; - let (first_data_shreds, _, _) = - setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( - slot, - parent_slot, - 100, - fec_set_index, - // Set merkle root here to make sure it is a resigned shred - Some(Hash::new_from_array(rand::thread_rng().gen())), - true, - ); - let merkle_root = first_data_shreds[0].merkle_root().unwrap(); - fec_set_index += first_data_shreds.len() as u32; - let (last_data_shreds, _, _) = - setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( - slot, - parent_slot, - 100, - fec_set_index, - Some(merkle_root), - true, - ); - let last_index = last_data_shreds.last().unwrap().index(); - blockstore - .insert_shreds(first_data_shreds, None, false) - .unwrap(); - // This should fail for the double last_shred_in_slot flag, so we need to pass `is_trusted : true` - blockstore - .insert_shreds(last_data_shreds, None, true) - .unwrap(); - // Manually update last index flag such that the check uses shreds from both fec sets - let mut slot_meta = blockstore.meta(slot).unwrap().unwrap(); - slot_meta.last_index = Some((last_index - 3) as u64); - blockstore.put_meta(slot, &slot_meta).unwrap(); - let results = blockstore.check_last_fec_set(slot).unwrap(); - assert!(results.block_id.is_none()); - assert!(results.is_retransmitter_signed); - blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); - // Slot is full, but does not contain retransmitter shreds let fec_set_index = 0; let (first_data_shreds, _, _) = From b27d78e21a82641609a9e76e5d82aa864cf3977a Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 17 Jul 2024 19:53:44 +0000 Subject: [PATCH 09/11] pr feedback: remove report_metrics helper fn --- ledger/src/blockstore.rs | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 13c79971910024..e9d7537fcb7f63 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -181,26 +181,7 @@ pub struct LastFECSetCheckResults { } impl LastFECSetCheckResults { - pub fn report_metrics(&self, slot: Slot, bank_hash: Hash, cluster_type: ClusterType) { - if self.block_id.is_none() { - datapoint_warn!( - "incomplete_final_fec_set", - ("slot", slot, i64), - ("bank_hash", bank_hash.to_string(), String) - ); - } - // These metrics are expensive to send because hash does not compress well. - // Only send these metrics when we are sure the appropriate shred format is being sent - if !self.is_retransmitter_signed && shred::should_chain_merkle_shreds(slot, cluster_type) { - datapoint_warn!( - "invalid_retransmitter_signature_final_fec_set", - ("slot", slot, i64), - ("bank_hash", bank_hash.to_string(), String) - ); - } - } - - pub fn get_block_id( + fn get_block_id( &self, feature_set: &FeatureSet, ) -> std::result::Result, BlockstoreProcessorError> { @@ -3754,7 +3735,18 @@ impl Blockstore { return Ok(None); }; // Update metrics - results.report_metrics(slot, bank_hash, cluster_type); + if results.block_id.is_none() { + datapoint_warn!("incomplete_final_fec_set", ("slot", slot, i64),); + } + // These metrics are expensive to send because hash does not compress well. + // Only send these metrics when we are sure the appropriate shred format is being sent + if !results.is_retransmitter_signed && shred::should_chain_merkle_shreds(slot, cluster_type) + { + datapoint_warn!( + "invalid_retransmitter_signature_final_fec_set", + ("slot", slot, i64), + ); + } // Return block id / error based on feature flags results.get_block_id(feature_set) } From c88824c1a2264710b33b0367a82a4d7b79b13621 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 18 Jul 2024 15:52:03 +0000 Subject: [PATCH 10/11] pr feedback: remove metric --- core/src/replay_stage.rs | 1 - ledger/src/blockstore.rs | 12 +----------- ledger/src/shred.rs | 6 +----- .../src/broadcast_stage/standard_broadcast_run.rs | 9 +++++---- 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3aadd7606a6d0c..bd0c3ad54cdd00 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3082,7 +3082,6 @@ impl ReplayStage { match blockstore.check_last_fec_set_and_get_block_id( bank.slot(), bank.hash(), - bank.cluster_type(), &bank.feature_set, ) { Ok(block_id) => block_id, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e9d7537fcb7f63..4e6236540394a0 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -50,7 +50,7 @@ use { clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND}, feature_set::FeatureSet, genesis_config::{ - ClusterType, GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE, + GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE, }, hash::Hash, pubkey::Pubkey, @@ -3719,7 +3719,6 @@ impl Blockstore { &self, slot: Slot, bank_hash: Hash, - cluster_type: ClusterType, feature_set: &FeatureSet, ) -> std::result::Result, BlockstoreProcessorError> { let results = self.check_last_fec_set(slot); @@ -3738,15 +3737,6 @@ impl Blockstore { if results.block_id.is_none() { datapoint_warn!("incomplete_final_fec_set", ("slot", slot, i64),); } - // These metrics are expensive to send because hash does not compress well. - // Only send these metrics when we are sure the appropriate shred format is being sent - if !results.is_retransmitter_signed && shred::should_chain_merkle_shreds(slot, cluster_type) - { - datapoint_warn!( - "invalid_retransmitter_signature_final_fec_set", - ("slot", slot, i64), - ); - } // Return block id / error based on feature flags results.get_block_id(feature_set) } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 3086cd7894eae7..814ec2b5bf303a 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -341,7 +341,7 @@ macro_rules! dispatch { } } -use {dispatch, solana_sdk::genesis_config::ClusterType}; +use dispatch; impl Shred { dispatch!(fn common_header(&self) -> &ShredCommonHeader); @@ -1283,10 +1283,6 @@ pub fn verify_test_data_shred( } } -pub fn should_chain_merkle_shreds(_slot: Slot, cluster_type: ClusterType) -> bool { - cluster_type == ClusterType::Development -} - #[cfg(test)] mod tests { use { diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index e4ec736f184da0..f108fc08226a6b 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -9,10 +9,7 @@ use { solana_entry::entry::Entry, solana_ledger::{ blockstore, - shred::{ - should_chain_merkle_shreds, shred_code, ProcessShredsStats, ReedSolomonCache, Shred, - ShredFlags, Shredder, - }, + shred::{shred_code, ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder}, }, solana_sdk::{ genesis_config::ClusterType, @@ -509,6 +506,10 @@ impl BroadcastRun for StandardBroadcastRun { } } +fn should_chain_merkle_shreds(_slot: Slot, cluster_type: ClusterType) -> bool { + cluster_type == ClusterType::Development +} + #[cfg(test)] mod test { use { From df32e7b3e6fda9d61a6e25ce646a5340e0c1b36e Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 18 Jul 2024 15:59:19 +0000 Subject: [PATCH 11/11] pr feedback: block_id -> last_fec_set_merkle_root --- ledger/src/blockstore.rs | 75 ++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 4e6236540394a0..8c742fd3c25121 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -49,9 +49,7 @@ use { address_lookup_table::state::AddressLookupTable, clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND}, feature_set::FeatureSet, - genesis_config::{ - GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE, - }, + genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, hash::Hash, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, @@ -176,17 +174,17 @@ impl AsRef for WorkingEntry { #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct LastFECSetCheckResults { - block_id: Option, + last_fec_set_merkle_root: Option, is_retransmitter_signed: bool, } impl LastFECSetCheckResults { - fn get_block_id( + fn get_last_fec_set_merkle_root( &self, feature_set: &FeatureSet, ) -> std::result::Result, BlockstoreProcessorError> { if feature_set.is_active(&solana_sdk::feature_set::vote_only_full_fec_sets::id()) - && self.block_id.is_none() + && self.last_fec_set_merkle_root.is_none() { return Err(BlockstoreProcessorError::IncompleteFinalFecSet); } else if feature_set @@ -195,7 +193,7 @@ impl LastFECSetCheckResults { { return Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet); } - Ok(self.block_id) + Ok(self.last_fec_set_merkle_root) } } @@ -3734,11 +3732,11 @@ impl Blockstore { return Ok(None); }; // Update metrics - if results.block_id.is_none() { + if results.last_fec_set_merkle_root.is_none() { datapoint_warn!("incomplete_final_fec_set", ("slot", slot, i64),); } // Return block id / error based on feature flags - results.get_block_id(feature_set) + results.get_last_fec_set_merkle_root(feature_set) } /// Performs checks on the last FEC set for this slot. @@ -3771,7 +3769,7 @@ impl Blockstore { let Some(start_index) = last_shred_index.checked_sub(MINIMUM_INDEX) else { warn!("Slot {slot} has only {} shreds, fewer than the {DATA_SHREDS_PER_FEC_BLOCK} required", last_shred_index + 1); return Ok(LastFECSetCheckResults { - block_id: None, + last_fec_set_merkle_root: None, is_retransmitter_signed: false, }); }; @@ -3808,12 +3806,12 @@ impl Blockstore { // After the dedup there should be exactly one Hash left and one true value let &[(block_id, is_retransmitter_signed)] = deduped_shred_checks.as_slice() else { return Ok(LastFECSetCheckResults { - block_id: None, + last_fec_set_merkle_root: None, is_retransmitter_signed: false, }); }; Ok(LastFECSetCheckResults { - block_id: Some(block_id), + last_fec_set_merkle_root: Some(block_id), is_retransmitter_signed, }) } @@ -12011,7 +12009,7 @@ pub mod tests { let block_id = data_shreds[0].merkle_root().unwrap(); blockstore.insert_shreds(data_shreds, None, false).unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert_eq!(results.block_id, Some(block_id)); + assert_eq!(results.last_fec_set_merkle_root, Some(block_id)); assert!(results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -12051,7 +12049,7 @@ pub mod tests { slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert!(results.block_id.is_none()); + assert!(results.last_fec_set_merkle_root.is_none()); assert!(!results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -12092,7 +12090,7 @@ pub mod tests { slot_meta.last_index = Some(last_index as u64); blockstore.put_meta(slot, &slot_meta).unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert!(results.block_id.is_none()); + assert!(results.last_fec_set_merkle_root.is_none()); assert!(!results.is_retransmitter_signed); blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); @@ -12114,7 +12112,7 @@ pub mod tests { .insert_shreds(first_data_shreds, None, false) .unwrap(); let results = blockstore.check_last_fec_set(slot).unwrap(); - assert_eq!(results.block_id, Some(block_id)); + assert_eq!(results.last_fec_set_merkle_root, Some(block_id)); assert!(!results.is_retransmitter_signed); } @@ -12128,66 +12126,74 @@ pub mod tests { retransmitter_only.activate(&vote_only_retransmitter_signed_fec_sets::id(), 0); let results = LastFECSetCheckResults { - block_id: None, + last_fec_set_merkle_root: None, is_retransmitter_signed: false, }; assert_matches!( - results.get_block_id(&enabled_feature_set), + results.get_last_fec_set_merkle_root(&enabled_feature_set), Err(BlockstoreProcessorError::IncompleteFinalFecSet) ); assert_matches!( - results.get_block_id(&full_only), + results.get_last_fec_set_merkle_root(&full_only), Err(BlockstoreProcessorError::IncompleteFinalFecSet) ); assert_matches!( - results.get_block_id(&retransmitter_only), + results.get_last_fec_set_merkle_root(&retransmitter_only), Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) ); assert!(results - .get_block_id(&disabled_feature_set) + .get_last_fec_set_merkle_root(&disabled_feature_set) .unwrap() .is_none()); let block_id = Hash::new_unique(); let results = LastFECSetCheckResults { - block_id: Some(block_id), + last_fec_set_merkle_root: Some(block_id), is_retransmitter_signed: false, }; assert_matches!( - results.get_block_id(&enabled_feature_set), + results.get_last_fec_set_merkle_root(&enabled_feature_set), Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) ); - assert_eq!(results.get_block_id(&full_only).unwrap(), Some(block_id)); + assert_eq!( + results.get_last_fec_set_merkle_root(&full_only).unwrap(), + Some(block_id) + ); assert_matches!( - results.get_block_id(&retransmitter_only), + results.get_last_fec_set_merkle_root(&retransmitter_only), Err(BlockstoreProcessorError::InvalidRetransmitterSignatureFinalFecSet) ); assert_eq!( - results.get_block_id(&disabled_feature_set).unwrap(), + results + .get_last_fec_set_merkle_root(&disabled_feature_set) + .unwrap(), Some(block_id) ); let results = LastFECSetCheckResults { - block_id: None, + last_fec_set_merkle_root: None, is_retransmitter_signed: true, }; assert_matches!( - results.get_block_id(&enabled_feature_set), + results.get_last_fec_set_merkle_root(&enabled_feature_set), Err(BlockstoreProcessorError::IncompleteFinalFecSet) ); assert_matches!( - results.get_block_id(&full_only), + results.get_last_fec_set_merkle_root(&full_only), Err(BlockstoreProcessorError::IncompleteFinalFecSet) ); - assert!(results.get_block_id(&retransmitter_only).unwrap().is_none()); assert!(results - .get_block_id(&disabled_feature_set) + .get_last_fec_set_merkle_root(&retransmitter_only) + .unwrap() + .is_none()); + assert!(results + .get_last_fec_set_merkle_root(&disabled_feature_set) .unwrap() .is_none()); let block_id = Hash::new_unique(); let results = LastFECSetCheckResults { - block_id: Some(block_id), + last_fec_set_merkle_root: Some(block_id), is_retransmitter_signed: true, }; for feature_set in [ @@ -12196,7 +12202,10 @@ pub mod tests { full_only, retransmitter_only, ] { - assert_eq!(results.get_block_id(&feature_set).unwrap(), Some(block_id)); + assert_eq!( + results.get_last_fec_set_merkle_root(&feature_set).unwrap(), + Some(block_id) + ); } } }