diff --git a/Cargo.lock b/Cargo.lock index 86893e45fb9..f067a4e5124 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6133,6 +6133,7 @@ dependencies = [ "solana-version", "solana-vote-program", "static_assertions", + "test-case", "thiserror", ] diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f01bf15fa93..eb9906c5c31 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2824,6 +2824,28 @@ impl ReplayStage { purge_repair_slot_counter, SlotStateUpdate::BankFrozen(bank_frozen_state), ); + // If we are replaying from ledger and previously marked this slot as duplicate, let the state machine know + if blockstore.get_duplicate_slot(bank.slot()).is_some() { + let duplicate_state = DuplicateState::new_from_state( + bank.slot(), + gossip_duplicate_confirmed_slots, + heaviest_subtree_fork_choice, + || false, + || Some(bank.hash()), + ); + check_slot_agrees_with_cluster( + bank.slot(), + bank_forks.read().unwrap().root(), + blockstore, + duplicate_slots_tracker, + epoch_slots_frozen_slots, + heaviest_subtree_fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + SlotStateUpdate::Duplicate(duplicate_state), + ); + } if let Some(sender) = bank_notification_sender { sender .sender diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 040bac55f6a..4b41801ec38 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -214,7 +214,7 @@ impl Tvu { leader_schedule_cache.clone(), verified_vote_receiver, completed_data_sets_sender, - duplicate_slots_sender, + duplicate_slots_sender.clone(), ancestor_hashes_replay_update_receiver, dumped_slots_receiver, popular_pruned_forks_sender, @@ -322,6 +322,7 @@ impl Tvu { blockstore, leader_schedule_cache.clone(), bank_forks.clone(), + duplicate_slots_sender, ), ); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 88dc0034d0e..381382915bf 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -19,7 +19,7 @@ use { rayon::{prelude::*, ThreadPool}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ - blockstore::{Blockstore, BlockstoreInsertionMetrics}, + blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred}, leader_schedule_cache::LeaderScheduleCache, shred::{self, Nonce, ReedSolomonCache, Shred}, }, @@ -138,23 +138,41 @@ impl WindowServiceMetrics { fn run_check_duplicate( cluster_info: &ClusterInfo, blockstore: &Blockstore, - shred_receiver: &Receiver, + shred_receiver: &Receiver, duplicate_slots_sender: &DuplicateSlotSender, ) -> Result<()> { - let check_duplicate = |shred: Shred| -> Result<()> { + let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> { let shred_slot = shred.slot(); - if !blockstore.has_duplicate_shreds_in_slot(shred_slot) { - if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) { - cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?; - blockstore.store_duplicate_slot( - shred_slot, - existing_shred_payload, - shred.into_payload(), - )?; - - duplicate_slots_sender.send(shred_slot)?; + let (shred1, shred2) = match shred { + PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict), + PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict), + PossibleDuplicateShred::Exists(shred) => { + // Unlike the other cases we have to wait until here to decide to handle the duplicate and store + // in blockstore. This is because the duplicate could have been part of the same insert batch, + // so we wait until the batch has been written. + if !blockstore.has_duplicate_shreds_in_slot(shred_slot) { + if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) { + blockstore.store_duplicate_slot( + shred_slot, + existing_shred_payload.clone(), + shred.clone().into_payload(), + )?; + (shred, existing_shred_payload) + } else { + // Shred is not duplicate + return Ok(()); + } + } else { + // Shred has already been handled + return Ok(()); + } } - } + }; + + // Propagate duplicate proof through gossip + cluster_info.push_duplicate_shred(&shred1, &shred2)?; + // Notify duplicate consensus state machine + duplicate_slots_sender.send(shred_slot)?; Ok(()) }; @@ -226,7 +244,7 @@ fn run_insert( reed_solomon_cache: &ReedSolomonCache, ) -> Result<()> where - F: Fn(Shred), + F: Fn(PossibleDuplicateShred), { const RECV_TIMEOUT: Duration = Duration::from_millis(200); let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed"); @@ -370,7 +388,7 @@ impl WindowService { cluster_info: Arc, exit: Arc, blockstore: Arc, - duplicate_receiver: Receiver, + duplicate_receiver: Receiver, duplicate_slots_sender: DuplicateSlotSender, ) -> JoinHandle<()> { let handle_error = || { @@ -400,7 +418,7 @@ impl WindowService { blockstore: Arc, leader_schedule_cache: Arc, verified_receiver: Receiver>, - check_duplicate_sender: Sender, + check_duplicate_sender: Sender, completed_data_sets_sender: CompletedDataSetsSender, retransmit_sender: Sender>, outstanding_requests: Arc>, @@ -417,8 +435,8 @@ impl WindowService { Builder::new() .name("solWinInsert".to_string()) .spawn(move || { - let handle_duplicate = |shred| { - let _ = check_duplicate_sender.send(shred); + let handle_duplicate = |possible_duplicate_shred| { + let _ = check_duplicate_sender.send(possible_duplicate_shred); }; let mut metrics = BlockstoreInsertionMetrics::default(); let mut ws_metrics = WindowServiceMetrics::default(); @@ -551,7 +569,9 @@ mod test { }; assert_eq!(duplicate_shred.slot(), shreds[0].slot()); let duplicate_shred_slot = duplicate_shred.slot(); - sender.send(duplicate_shred.clone()).unwrap(); + sender + .send(PossibleDuplicateShred::Exists(duplicate_shred.clone())) + .unwrap(); assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); let keypair = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp()); diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 7ba37448c42..4c93efe8f21 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -53,6 +53,7 @@ thiserror = { workspace = true } [dev-dependencies] num_cpus = { workspace = true } serial_test = { workspace = true } +test-case = { workspace = true } [build-dependencies] rustc_version = { workspace = true } diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 6ab52edd268..41a0e4c9ab4 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -149,7 +149,7 @@ impl CrdsGossip { let now = timestamp(); for entry in entries { if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) { - error!("push_duplicate_shred faild: {:?}", err); + error!("push_duplicate_shred failed: {:?}", err); } } Ok(()) diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 24c4f2eb000..ab6e3642942 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -3,7 +3,7 @@ use { itertools::Itertools, solana_ledger::{ blockstore::BlockstoreError, - blockstore_meta::DuplicateSlotProof, + blockstore_meta::{DuplicateSlotProof, ErasureMeta}, shred::{self, Shred, ShredType}, }, solana_sdk::{ @@ -29,7 +29,8 @@ pub struct DuplicateShred { pub(crate) from: Pubkey, pub(crate) wallclock: u64, pub(crate) slot: Slot, - shred_index: u32, + // Shred index of the first shred in the proof + pub(crate) shred_index: u32, shred_type: ShredType, // Serialized DuplicateSlotProof split into chunks. num_chunks: u8, @@ -84,35 +85,73 @@ pub enum Error { TryFromIntError(#[from] TryFromIntError), #[error("unknown slot leader: {0}")] UnknownSlotLeader(Slot), + #[error("invalid last index conflict")] + InvalidLastIndexConflict, + #[error("invalid erasure meta conflict")] + InvalidErasureMetaConflict, + #[error("unable to send duplicate slot to state machine")] + DuplicateSlotSenderFailure, } -// Asserts that the two shreds can indicate duplicate proof for -// the same triplet of (slot, shred-index, and shred-type_), and -// that they have valid signatures from the slot leader. +/// Check that `shred1` and `shred2` indicate a valid duplicate proof +/// - Must be for the same slot +/// - Must have the same `shred_type` +/// - Must both sigverify for the correct leader +/// - If `shred1` and `shred2` share the same index they must be not equal +/// - If `shred1` and `shred2` do not share the same index and are data shreds +/// verify that they indicate an index conflict. One of them must be the +/// LAST_SHRED_IN_SLOT, however the other shred must have a higher index. +/// - If `shred1` and `shred2` do not share the same index and are coding shreds +/// verify that they have conflicting erasure metas fn check_shreds(leader_schedule: Option, shred1: &Shred, shred2: &Shred) -> Result<(), Error> where F: FnOnce(Slot) -> Option, { if shred1.slot() != shred2.slot() { - Err(Error::SlotMismatch) - } else if shred1.index() != shred2.index() { - // TODO: Should also allow two coding shreds with different indices but - // same fec-set-index and mismatching erasure-config. - Err(Error::ShredIndexMismatch) - } else if shred1.shred_type() != shred2.shred_type() { - Err(Error::ShredTypeMismatch) - } else if shred1.payload() == shred2.payload() { - Err(Error::InvalidDuplicateShreds) - } else { - if let Some(leader_schedule) = leader_schedule { - let slot_leader = - leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?; - if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) { - return Err(Error::InvalidSignature); - } + return Err(Error::SlotMismatch); + } + + if shred1.shred_type() != shred2.shred_type() { + return Err(Error::ShredTypeMismatch); + } + + if shred1.index() == shred2.index() && shred1.payload() == shred2.payload() { + return Err(Error::InvalidDuplicateShreds); + } + + if shred1.index() != shred2.index() && shred1.shred_type() == ShredType::Data { + match ( + shred1.index(), + shred1.last_in_slot(), + shred2.index(), + shred2.last_in_slot(), + ) { + (ix1, true, ix2, false) if ix1 > ix2 => return Err(Error::InvalidLastIndexConflict), + (ix1, false, ix2, true) if ix1 < ix2 => return Err(Error::InvalidLastIndexConflict), + (_, false, _, false) => return Err(Error::InvalidLastIndexConflict), + _ => (), + } + } + + if shred1.index() != shred2.index() && shred1.shred_type() == ShredType::Code { + if shred1.fec_set_index() != shred2.fec_set_index() { + return Err(Error::InvalidErasureMetaConflict); + } + let erasure_meta = + ErasureMeta::from_coding_shred(shred1).expect("Shred1 should be a coding shred"); + if erasure_meta.check_coding_shred(shred2) { + return Err(Error::InvalidErasureMetaConflict); } - Ok(()) } + + if let Some(leader_schedule) = leader_schedule { + let slot_leader = + leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?; + if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) { + return Err(Error::InvalidSignature); + } + } + Ok(()) } pub(crate) fn from_shred( @@ -231,15 +270,12 @@ pub(crate) fn into_shreds( let shred2 = Shred::new_from_serialized_shred(proof.shred2)?; if shred1.slot() != slot || shred2.slot() != slot { Err(Error::SlotMismatch) - } else if shred1.index() != shred_index || shred2.index() != shred_index { + } else if shred1.index() != shred_index && shred2.index() != shred_index { Err(Error::ShredIndexMismatch) } else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type { Err(Error::ShredTypeMismatch) - } else if shred1.payload() == shred2.payload() { - Err(Error::InvalidDuplicateShreds) - } else if !shred1.verify(slot_leader) || !shred2.verify(slot_leader) { - Err(Error::InvalidSignature) } else { + check_shreds(Some(|_| Some(slot_leader).copied()), &shred1, &shred2)?; Ok((shred1, shred2)) } } @@ -267,6 +303,7 @@ pub(crate) mod tests { system_transaction, }, std::sync::Arc, + test_case::test_case, }; #[test] @@ -297,6 +334,71 @@ pub(crate) mod tests { shredder: &Shredder, keypair: &Keypair, ) -> Shred { + let (mut data_shreds, _) = new_rand_shreds( + rng, + next_shred_index, + next_shred_index, + 5, + true, + shredder, + keypair, + true, + ); + data_shreds.pop().unwrap() + } + + fn new_rand_data_shred( + rng: &mut R, + next_shred_index: u32, + shredder: &Shredder, + keypair: &Keypair, + merkle_variant: bool, + is_last_in_slot: bool, + ) -> Shred { + let (mut data_shreds, _) = new_rand_shreds( + rng, + next_shred_index, + next_shred_index, + 5, + merkle_variant, + shredder, + keypair, + is_last_in_slot, + ); + data_shreds.pop().unwrap() + } + + fn new_rand_coding_shreds( + rng: &mut R, + next_shred_index: u32, + num_entries: usize, + shredder: &Shredder, + keypair: &Keypair, + merkle_variant: bool, + ) -> Vec { + let (_, coding_shreds) = new_rand_shreds( + rng, + next_shred_index, + next_shred_index, + num_entries, + merkle_variant, + shredder, + keypair, + true, + ); + coding_shreds + } + + fn new_rand_shreds( + rng: &mut R, + next_shred_index: u32, + next_code_index: u32, + num_entries: usize, + merkle_variant: bool, + shredder: &Shredder, + keypair: &Keypair, + is_last_in_slot: bool, + ) -> (Vec, Vec) { let entries: Vec<_> = std::iter::repeat_with(|| { let tx = system_transaction::transfer( &Keypair::new(), // from @@ -310,30 +412,76 @@ pub(crate) mod tests { vec![tx], // transactions ) }) - .take(5) + .take(num_entries) .collect(); - let (mut data_shreds, _coding_shreds) = shredder.entries_to_shreds( + shredder.entries_to_shreds( keypair, &entries, - true, // is_last_in_slot + is_last_in_slot, next_shred_index, - next_shred_index, // next_code_index - true, // merkle_variant + next_code_index, // next_code_index + merkle_variant, &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), - ); - data_shreds.swap_remove(0) + ) } - #[test] - fn test_duplicate_shred_round_trip() { + fn from_shred_bypass_checks( + shred: Shred, + self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value. + other_shred: Shred, + wallclock: u64, + max_size: usize, // Maximum serialized size of each DuplicateShred. + ) -> Result, Error> { + let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type()); + let proof = DuplicateSlotProof { + shred1: shred.into_payload(), + shred2: other_shred.into_payload(), + }; + let data = bincode::serialize(&proof)?; + let chunk_size = max_size - DUPLICATE_SHRED_HEADER_SIZE; + let chunks: Vec<_> = data.chunks(chunk_size).map(Vec::from).collect(); + let num_chunks = u8::try_from(chunks.len())?; + let chunks = chunks + .into_iter() + .enumerate() + .map(move |(i, chunk)| DuplicateShred { + from: self_pubkey, + wallclock, + slot, + shred_index, + shred_type, + num_chunks, + chunk_index: i as u8, + chunk, + }); + Ok(chunks) + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_duplicate_shred_round_trip(merkle_variant: bool) { let mut rng = rand::thread_rng(); let leader = Arc::new(Keypair::new()); let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); let next_shred_index = rng.gen_range(0, 32_000); - let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); - let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); + let shred1 = new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ); + let shred2 = new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ); let leader_schedule = |s| { if s == slot { Some(leader.pubkey()) @@ -356,4 +504,461 @@ pub(crate) mod tests { assert_eq!(shred1, shred3); assert_eq!(shred2, shred4); } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_duplicate_shred_invalid(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0, 32_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let data_shred = new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ); + let coding_shreds = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 10, + &shredder, + &leader, + merkle_variant, + ); + let test_cases = vec![ + // Same data_shred + (data_shred.clone(), data_shred), + // Same coding_shred + (coding_shreds[0].clone(), coding_shreds[0].clone()), + ]; + for (shred1, shred2) in test_cases.into_iter() { + assert_matches!( + from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .err() + .unwrap(), + Error::InvalidDuplicateShreds + ); + + let chunks: Vec<_> = from_shred_bypass_checks( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.clone(), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + + assert_matches!( + into_shreds(&leader.pubkey(), chunks).err().unwrap(), + Error::InvalidDuplicateSlotProof + ); + } + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_latest_index_conflict_round_trip(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0, 31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let test_cases = vec![ + ( + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ), + new_rand_data_shred( + &mut rng, + next_shred_index + 1, + &shredder, + &leader, + merkle_variant, + false, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index + 1, + &shredder, + &leader, + merkle_variant, + false, + ), + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index + 100, + &shredder, + &leader, + merkle_variant, + true, + ), + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ), + new_rand_data_shred( + &mut rng, + next_shred_index + 100, + &shredder, + &leader, + merkle_variant, + true, + ), + ), + ]; + for (shred1, shred2) in test_cases.into_iter() { + let chunks: Vec<_> = from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap(); + assert_eq!(shred1, shred3); + assert_eq!(shred2, shred4); + } + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_latest_index_conflict_invalid(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0, 31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let test_cases = vec![ + ( + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + false, + ), + new_rand_data_shred( + &mut rng, + next_shred_index + 1, + &shredder, + &leader, + merkle_variant, + true, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index + 1, + &shredder, + &leader, + merkle_variant, + true, + ), + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + false, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index + 100, + &shredder, + &leader, + merkle_variant, + false, + ), + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + false, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + false, + ), + new_rand_data_shred( + &mut rng, + next_shred_index + 100, + &shredder, + &leader, + merkle_variant, + false, + ), + ), + ]; + for (shred1, shred2) in test_cases.into_iter() { + assert_matches!( + from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .err() + .unwrap(), + Error::InvalidLastIndexConflict + ); + + let chunks: Vec<_> = from_shred_bypass_checks( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.clone(), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + + assert_matches!( + into_shreds(&leader.pubkey(), chunks).err().unwrap(), + Error::InvalidLastIndexConflict + ); + } + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_erasure_meta_conflict_round_trip(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0, 31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let coding_shreds = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 10, + &shredder, + &leader, + merkle_variant, + ); + let coding_shreds_bigger = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 13, + &shredder, + &leader, + merkle_variant, + ); + let coding_shreds_smaller = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 7, + &shredder, + &leader, + merkle_variant, + ); + + // Same fec-set, different index, different erasure meta + let test_cases = vec![ + (coding_shreds[0].clone(), coding_shreds_bigger[1].clone()), + (coding_shreds[0].clone(), coding_shreds_smaller[1].clone()), + ]; + for (shred1, shred2) in test_cases.into_iter() { + let chunks: Vec<_> = from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap(); + assert_eq!(shred1, shred3); + assert_eq!(shred2, shred4); + } + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_erasure_meta_conflict_invalid(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0, 31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let coding_shreds = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 10, + &shredder, + &leader, + merkle_variant, + ); + let coding_shreds_different_fec = new_rand_coding_shreds( + &mut rng, + next_shred_index + 1, + 10, + &shredder, + &leader, + merkle_variant, + ); + let coding_shreds_different_fec_and_size = new_rand_coding_shreds( + &mut rng, + next_shred_index + 1, + 13, + &shredder, + &leader, + merkle_variant, + ); + + let test_cases = vec![ + // Different index, different fec set, same erasure meta + ( + coding_shreds[0].clone(), + coding_shreds_different_fec[1].clone(), + ), + // Different index, different fec set, different erasure meta + ( + coding_shreds[0].clone(), + coding_shreds_different_fec_and_size[1].clone(), + ), + // Different index, same fec set, same erasure meta + (coding_shreds[0].clone(), coding_shreds[1].clone()), + ( + coding_shreds_different_fec[0].clone(), + coding_shreds_different_fec[1].clone(), + ), + ( + coding_shreds_different_fec_and_size[0].clone(), + coding_shreds_different_fec_and_size[1].clone(), + ), + ]; + for (shred1, shred2) in test_cases.into_iter() { + assert_matches!( + from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .err() + .unwrap(), + Error::InvalidErasureMetaConflict + ); + + let chunks: Vec<_> = from_shred_bypass_checks( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.clone(), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + + assert_matches!( + into_shreds(&leader.pubkey(), chunks).err().unwrap(), + Error::InvalidErasureMetaConflict + ); + } + } } diff --git a/gossip/src/duplicate_shred_handler.rs b/gossip/src/duplicate_shred_handler.rs index 7789404b970..a20757b3389 100644 --- a/gossip/src/duplicate_shred_handler.rs +++ b/gossip/src/duplicate_shred_handler.rs @@ -3,6 +3,7 @@ use { duplicate_shred::{self, DuplicateShred, Error}, duplicate_shred_listener::DuplicateShredHandlerTrait, }, + crossbeam_channel::Sender, log::error, solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, solana_runtime::bank_forks::BankForks, @@ -25,6 +26,7 @@ const MAX_NUM_ENTRIES_PER_PUBKEY: usize = 128; const BUFFER_CAPACITY: usize = 512 * MAX_NUM_ENTRIES_PER_PUBKEY; type BufferEntry = [Option; MAX_NUM_CHUNKS]; +type DuplicateSlotSender = Sender; pub struct DuplicateShredHandler { // Because we use UDP for packet transfer, we can normally only send ~1500 bytes @@ -44,6 +46,8 @@ pub struct DuplicateShredHandler { cached_on_epoch: Epoch, cached_staked_nodes: Arc>, cached_slots_in_epoch: u64, + // Used to notify duplicate consensus state machine + duplicate_slots_sender: DuplicateSlotSender, } impl DuplicateShredHandlerTrait for DuplicateShredHandler { @@ -63,6 +67,7 @@ impl DuplicateShredHandler { blockstore: Arc, leader_schedule_cache: Arc, bank_forks: Arc>, + duplicate_slots_sender: DuplicateSlotSender, ) -> Self { Self { buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(), @@ -74,6 +79,7 @@ impl DuplicateShredHandler { blockstore, leader_schedule_cache, bank_forks, + duplicate_slots_sender, } } @@ -133,6 +139,10 @@ impl DuplicateShredHandler { )?; } self.consumed.insert(slot, true); + // Notify duplicate consensus state machine + self.duplicate_slots_sender + .send(slot) + .map_err(|_| Error::DuplicateSlotSenderFailure)?; } Ok(()) } @@ -211,6 +221,8 @@ mod tests { cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, duplicate_shred::{from_shred, tests::new_rand_shred}, }, + crossbeam_channel::unbounded, + itertools::Itertools, solana_ledger::{ genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, get_tmp_ledger_path_auto_delete, @@ -229,7 +241,7 @@ mod tests { slot: u64, expected_error: Option, chunk_size: usize, - ) -> Result, Error> { + ) -> Result>, Error> { let my_keypair = match expected_error { Some(Error::InvalidSignature) => Arc::new(Keypair::new()), _ => keypair, @@ -243,9 +255,6 @@ mod tests { Some(Error::SlotMismatch) => { new_rand_shred(&mut rng, next_shred_index, &shredder1, &my_keypair) } - Some(Error::ShredIndexMismatch) => { - new_rand_shred(&mut rng, next_shred_index + 1, &shredder, &my_keypair) - } Some(Error::InvalidDuplicateShreds) => shred1.clone(), _ => new_rand_shred(&mut rng, next_shred_index, &shredder, &my_keypair), }; @@ -261,7 +270,16 @@ mod tests { timestamp(), // wallclock chunk_size, // max_size )?; - Ok(chunks) + if let Some(Error::ShredIndexMismatch) = expected_error { + Ok(Box::new(chunks.map(|mut duplicate_shred| { + if duplicate_shred.chunk_index() > 0 { + duplicate_shred.shred_index += 1 + } + duplicate_shred + }))) + } else { + Ok(Box::new(chunks)) + } } #[test] @@ -278,10 +296,12 @@ mod tests { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank( &bank_forks.working_bank(), )); + let (sender, receiver) = unbounded(); let mut duplicate_shred_handler = DuplicateShredHandler::new( blockstore.clone(), leader_schedule_cache, Arc::new(RwLock::new(bank_forks)), + sender, ); let chunks = create_duplicate_proof( my_keypair.clone(), @@ -308,6 +328,7 @@ mod tests { } assert!(blockstore.has_duplicate_shreds_in_slot(1)); assert!(blockstore.has_duplicate_shreds_in_slot(2)); + assert_eq!(receiver.try_iter().collect_vec(), vec![1, 2]); // Test all kinds of bad proofs. for error in [ @@ -329,6 +350,7 @@ mod tests { duplicate_shred_handler.handle(chunk); } assert!(!blockstore.has_duplicate_shreds_in_slot(3)); + assert!(receiver.is_empty()); } } } @@ -349,8 +371,13 @@ mod tests { &bank_forks.working_bank(), )); let bank_forks_ptr = Arc::new(RwLock::new(bank_forks)); - let mut duplicate_shred_handler = - DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks_ptr); + let (sender, receiver) = unbounded(); + let mut duplicate_shred_handler = DuplicateShredHandler::new( + blockstore.clone(), + leader_schedule_cache, + bank_forks_ptr, + sender, + ); let start_slot: Slot = 1; // This proof will not be accepted because num_chunks is too large. @@ -366,6 +393,7 @@ mod tests { duplicate_shred_handler.handle(chunk); } assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert!(receiver.is_empty()); // This proof will be rejected because the slot is too far away in the future. let future_slot = @@ -382,6 +410,7 @@ mod tests { duplicate_shred_handler.handle(chunk); } assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot)); + assert!(receiver.is_empty()); // Send in two proofs, the first proof showing up will be accepted, the following // proofs will be discarded. @@ -396,10 +425,12 @@ mod tests { // handle chunk 0 of the first proof. duplicate_shred_handler.handle(chunks.next().unwrap()); assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert!(receiver.is_empty()); // Now send in the rest of the first proof, it will succeed. for chunk in chunks { duplicate_shred_handler.handle(chunk); } assert!(blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]); } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 41d4f224065..6c3895bb109 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -135,9 +135,26 @@ impl std::fmt::Display for InsertDataShredError { } } +#[derive(Eq, PartialEq, Debug, Clone)] +pub enum PossibleDuplicateShred { + Exists(Shred), // Blockstore has another shred in its spot + LastIndexConflict(/* original */ Shred, /* conflict */ Vec), // The index of this shred conflicts with `slot_meta.last_index` + ErasureConflict(/* original */ Shred, /* conflict */ Vec), // The coding shred has a conflict in the erasure_meta +} + +impl PossibleDuplicateShred { + pub fn slot(&self) -> Slot { + match self { + Self::Exists(shred) => shred.slot(), + Self::LastIndexConflict(shred, _) => shred.slot(), + Self::ErasureConflict(shred, _) => shred.slot(), + } + } +} + pub struct InsertResults { completed_data_set_infos: Vec, - duplicate_shreds: Vec, + duplicate_shreds: Vec, } /// A "complete data set" is a range of [`Shred`]s that combined in sequence carry a single @@ -1047,7 +1064,7 @@ impl Blockstore { metrics: &mut BlockstoreInsertionMetrics, ) -> Result> where - F: Fn(Shred), + F: Fn(PossibleDuplicateShred), { let InsertResults { completed_data_set_infos, @@ -1165,7 +1182,7 @@ impl Blockstore { write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, index_meta_time_us: &mut u64, - duplicate_shreds: &mut Vec, + duplicate_shreds: &mut Vec, is_trusted: bool, shred_source: ShredSource, metrics: &mut BlockstoreInsertionMetrics, @@ -1184,7 +1201,7 @@ impl Blockstore { if !is_trusted { if index_meta.coding().contains(shred_index) { metrics.num_coding_shreds_exists += 1; - duplicate_shreds.push(shred); + duplicate_shreds.push(PossibleDuplicateShred::Exists(shred)); return false; } @@ -1201,8 +1218,6 @@ impl Blockstore { .unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap()) }); - // TODO: handle_duplicate is not invoked and so duplicate shreds are - // not gossiped to the rest of cluster. if !erasure_meta.check_coding_shred(&shred) { metrics.num_coding_shreds_invalid_erasure_config += 1; let conflicting_shred = self.find_conflicting_coding_shred( @@ -1212,15 +1227,22 @@ impl Blockstore { just_received_shreds, ); if let Some(conflicting_shred) = conflicting_shred { - if self - .store_duplicate_if_not_existing( - slot, + if !self.has_duplicate_shreds_in_slot(slot) { + if self + .store_duplicate_slot( + slot, + conflicting_shred.clone(), + shred.payload().clone(), + ) + .is_err() + { + warn!("bad duplicate store.."); + } + + duplicate_shreds.push(PossibleDuplicateShred::ErasureConflict( + shred.clone(), conflicting_shred, - shred.payload().clone(), - ) - .is_err() - { - warn!("bad duplicate store.."); + )); } } else { datapoint_info!("bad-conflict-shred", ("slot", slot, i64)); @@ -1338,7 +1360,7 @@ impl Blockstore { just_inserted_shreds: &mut HashMap, index_meta_time_us: &mut u64, is_trusted: bool, - duplicate_shreds: &mut Vec, + duplicate_shreds: &mut Vec, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, ) -> std::result::Result, InsertDataShredError> { @@ -1360,7 +1382,7 @@ impl Blockstore { if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { - duplicate_shreds.push(shred); + duplicate_shreds.push(PossibleDuplicateShred::Exists(shred)); return Err(InsertDataShredError::Exists); } @@ -1385,6 +1407,7 @@ impl Blockstore { &self.last_root, leader_schedule, shred_source, + duplicate_shreds, ) { return Err(InsertDataShredError::InvalidShred); } @@ -1466,6 +1489,7 @@ impl Blockstore { last_root: &RwLock, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, + duplicate_shreds: &mut Vec, ) -> bool { let shred_index = u64::from(shred.index()); let slot = shred.slot(); @@ -1483,21 +1507,25 @@ impl Blockstore { let leader_pubkey = leader_schedule .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); - let ending_shred: Cow> = self.get_data_shred_from_just_inserted_or_db( - just_inserted_shreds, - slot, - last_index.unwrap(), - ); + if !self.has_duplicate_shreds_in_slot(slot) { + let ending_shred: Vec = self + .get_data_shred_from_just_inserted_or_db( + just_inserted_shreds, + slot, + last_index.unwrap(), + ) + .into_owned(); - if self - .store_duplicate_if_not_existing( - slot, - ending_shred.into_owned(), - shred.payload().clone(), - ) - .is_err() - { - warn!("store duplicate error"); + if self + .store_duplicate_slot(slot, ending_shred.clone(), shred.payload().clone()) + .is_err() + { + warn!("store duplicate error"); + } + duplicate_shreds.push(PossibleDuplicateShred::LastIndexConflict( + shred.clone(), + ending_shred, + )); } datapoint_error!( @@ -1518,21 +1546,25 @@ impl Blockstore { let leader_pubkey = leader_schedule .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); - let ending_shred: Cow> = self.get_data_shred_from_just_inserted_or_db( - just_inserted_shreds, - slot, - slot_meta.received - 1, - ); + if !self.has_duplicate_shreds_in_slot(slot) { + let ending_shred: Vec = self + .get_data_shred_from_just_inserted_or_db( + just_inserted_shreds, + slot, + slot_meta.received - 1, + ) + .into_owned(); - if self - .store_duplicate_if_not_existing( - slot, - ending_shred.into_owned(), - shred.payload().clone(), - ) - .is_err() - { - warn!("store duplicate error"); + if self + .store_duplicate_slot(slot, ending_shred.clone(), shred.payload().clone()) + .is_err() + { + warn!("store duplicate error"); + } + duplicate_shreds.push(PossibleDuplicateShred::LastIndexConflict( + shred.clone(), + ending_shred, + )); } datapoint_error!( @@ -3224,19 +3256,6 @@ impl Blockstore { self.duplicate_slots_cf.delete(slot) } - pub fn store_duplicate_if_not_existing( - &self, - slot: Slot, - shred1: Vec, - shred2: Vec, - ) -> Result<()> { - if !self.has_duplicate_shreds_in_slot(slot) { - self.store_duplicate_slot(slot, shred1, shred2) - } else { - Ok(()) - } - } - pub fn get_first_duplicate_proof(&self) -> Option<(Slot, DuplicateSlotProof)> { let mut iter = self .db @@ -6571,6 +6590,7 @@ pub mod tests { &last_root, None, ShredSource::Repaired, + &mut Vec::new(), )); // Trying to insert another "is_last" shred with index < the received index should fail // skip over shred 7 @@ -6587,6 +6607,7 @@ pub mod tests { panic!("Shred in unexpected format") } }; + let mut duplicate_shreds = vec![]; assert!(!blockstore.should_insert_data_shred( &shred7, &slot_meta, @@ -6594,8 +6615,15 @@ pub mod tests { &last_root, None, ShredSource::Repaired, + &mut duplicate_shreds, )); assert!(blockstore.has_duplicate_shreds_in_slot(0)); + assert_eq!(duplicate_shreds.len(), 1); + matches!( + duplicate_shreds[0], + PossibleDuplicateShred::LastIndexConflict(_, _) + ); + assert_eq!(duplicate_shreds[0].slot(), 0); // Insert all pending shreds let mut shred8 = shreds[8].clone(); @@ -6604,18 +6632,30 @@ pub mod tests { // Trying to insert a shred with index > the "is_last" shred should fail if shred8.is_data() { - shred8.set_slot(slot_meta.last_index.unwrap() + 1); + shred8.set_index((slot_meta.last_index.unwrap() + 1) as u32); } else { panic!("Shred in unexpected format") } + duplicate_shreds.clear(); + blockstore.duplicate_slots_cf.delete(0).unwrap(); + assert!(!blockstore.has_duplicate_shreds_in_slot(0)); assert!(!blockstore.should_insert_data_shred( - &shred7, + &shred8, &slot_meta, &HashMap::new(), &last_root, None, ShredSource::Repaired, + &mut duplicate_shreds, )); + + assert_eq!(duplicate_shreds.len(), 1); + matches!( + duplicate_shreds[0], + PossibleDuplicateShred::LastIndexConflict(_, _) + ); + assert_eq!(duplicate_shreds[0].slot(), 0); + assert!(blockstore.has_duplicate_shreds_in_slot(0)); } #[test] @@ -6701,7 +6741,10 @@ pub mod tests { ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); - assert_eq!(duplicate_shreds, vec![coding_shred]); + assert_eq!( + duplicate_shreds, + vec![PossibleDuplicateShred::Exists(coding_shred)] + ); } #[test] diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 76cf2b3de88..24a4fef0fa4 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -322,7 +322,7 @@ impl SlotMeta { } impl ErasureMeta { - pub(crate) fn from_coding_shred(shred: &Shred) -> Option { + pub fn from_coding_shred(shred: &Shred) -> Option { match shred.shred_type() { ShredType::Data => None, ShredType::Code => { @@ -344,7 +344,7 @@ impl ErasureMeta { // Returns true if the erasure fields on the shred // are consistent with the erasure-meta. - pub(crate) fn check_coding_shred(&self, shred: &Shred) -> bool { + pub fn check_coding_shred(&self, shred: &Shred) -> bool { let Some(mut other) = Self::from_coding_shred(shred) else { return false; };