diff --git a/core/src/repair/mod.rs b/core/src/repair/mod.rs index 606015d778..9f248b2bf2 100644 --- a/core/src/repair/mod.rs +++ b/core/src/repair/mod.rs @@ -16,4 +16,5 @@ pub mod request_response; pub mod result; pub mod serve_repair; pub mod serve_repair_service; +pub mod shred_resolver_service; pub(crate) mod standard_repair_handler; diff --git a/core/src/repair/shred_resolver_service.rs b/core/src/repair/shred_resolver_service.rs new file mode 100644 index 0000000000..b3db3f25eb --- /dev/null +++ b/core/src/repair/shred_resolver_service.rs @@ -0,0 +1,51 @@ +//! The shred resolver service listens to events emitted by +//! +//! Blockstore: +//! - FEC set has been ingested or successfully completed +//! - A conflicting shred has been ingested +//! +//! Votor: +//! - A block has reached "canonical" status, where canonical +//! means that it is the unique block of interest in the slot. +//! This is a result of the block or a descendant receiving +//! a Notarize, FastFinalizeation, or SlowFinalization certificate +//! - An alternate version of a block has been requested in order to +//! progress replay or check safe to notar conditions. We must fetch +//! these shreds, but it is not yet clear if this is the canonical block. +//! +//! Using these events it plans how to fetch the correct shreds and resolves +//! any incorrect shreds ingested. It interfaces with repair to send out +//! the appropriate requests. + +use { + super::repair_service::OutstandingShredRepairs, + solana_ledger::{blockstore::Blockstore, shred_event::ShredEventReceiver}, + std::{ + sync::{Arc, RwLock}, + thread::{self, JoinHandle}, + }, +}; + +pub struct ShredResolverService { + t_listen: JoinHandle<()>, +} + +impl ShredResolverService { + pub fn new( + _blockstore: Arc, + event_receiver: ShredEventReceiver, + _outstanding_repairs: Arc>, + ) -> Self { + let t_listen = thread::spawn(move || loop { + let Ok(_events) = event_receiver.recv() else { + break; + }; + }); + + Self { t_listen } + } + + pub fn join(self) -> thread::Result<()> { + self.t_listen.join() + } +} diff --git a/core/src/window_service.rs b/core/src/window_service.rs index a1cb5975e1..6cc3cc7b88 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -10,11 +10,12 @@ use { repair_service::{ OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels, }, + shred_resolver_service::ShredResolverService, }, result::{Error, Result}, }, agave_feature_set as feature_set, - crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, + crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError, Sender}, rayon::{prelude::*, ThreadPool}, solana_clock::{Slot, DEFAULT_MS_PER_SLOT}, solana_gossip::cluster_info::ClusterInfo, @@ -22,7 +23,8 @@ use { blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred}, blockstore_meta::BlockLocation, leader_schedule_cache::LeaderScheduleCache, - shred::{self, ReedSolomonCache, Shred}, + shred::{self, ReedSolomonCache, Shred, MAX_DATA_SHREDS_PER_SLOT}, + shred_event::ShredEventSender, }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, @@ -199,6 +201,7 @@ fn run_insert( ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: Option<&CompletedDataSetsSender>, retransmit_sender: &EvictingSender>, + shred_event_sender: &ShredEventSender, reed_solomon_cache: &ReedSolomonCache, accept_repairs_only: bool, ) -> Result<()> @@ -238,6 +241,7 @@ where false, // is_trusted retransmit_sender, &handle_duplicate, + Some(shred_event_sender), reed_solomon_cache, metrics, )?; @@ -280,6 +284,7 @@ pub(crate) struct WindowService { t_check_duplicate: JoinHandle<()>, repair_service: RepairService, certificate_service: CertificateService, + shred_resolver_service: ShredResolverService, } impl WindowService { @@ -322,6 +327,13 @@ impl WindowService { let certificate_service = CertificateService::new(exit.clone(), blockstore.clone(), certificate_receiver); + let (shred_event_sender, shred_event_receiver) = bounded(MAX_DATA_SHREDS_PER_SLOT); + let shred_resolver_service = ShredResolverService::new( + blockstore.clone(), + shred_event_receiver, + outstanding_repair_requests.clone(), + ); + let (duplicate_sender, duplicate_receiver) = unbounded(); let t_check_duplicate = Self::start_check_duplicate_thread( @@ -341,6 +353,7 @@ impl WindowService { duplicate_sender, completed_data_sets_sender, retransmit_sender, + shred_event_sender, accept_repairs_only, ); @@ -349,6 +362,7 @@ impl WindowService { t_check_duplicate, repair_service, certificate_service, + shred_resolver_service, } } @@ -391,6 +405,7 @@ impl WindowService { check_duplicate_sender: Sender, completed_data_sets_sender: Option, retransmit_sender: EvictingSender>, + shred_event_sender: ShredEventSender, accept_repairs_only: bool, ) -> JoinHandle<()> { let handle_error = || { @@ -426,6 +441,7 @@ impl WindowService { &mut ws_metrics, completed_data_sets_sender.as_ref(), &retransmit_sender, + &shred_event_sender, &reed_solomon_cache, accept_repairs_only, ) { @@ -467,7 +483,8 @@ impl WindowService { self.t_insert.join()?; self.t_check_duplicate.join()?; self.repair_service.join()?; - self.certificate_service.join() + self.certificate_service.join()?; + self.shred_resolver_service.join() } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 45ce96ad7d..d9175a0033 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -18,9 +18,10 @@ use { leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, shred::{ - self, ErasureSetId, ProcessShredsStats, ReedSolomonCache, Shred, ShredData, ShredId, - ShredType, Shredder, DATA_SHREDS_PER_FEC_BLOCK, + self, ErasureSetId, ProcessShredsStats, ReedSolomonCache, Shred, ShredData, ShredFlags, + ShredId, ShredType, Shredder, DATA_SHREDS_PER_FEC_BLOCK, }, + shred_event::{ShredEvent, ShredEventSender}, slot_stats::{ShredSource, SlotsStats}, transaction_address_lookup_table_scanner::scan_transaction, }, @@ -219,6 +220,7 @@ impl LastFECSetCheckResults { pub struct InsertResults { completed_data_set_infos: Vec, duplicate_shreds: Vec, + shred_events: Vec, } /// A "complete data set" is a range of [`Shred`]s that combined in sequence carry a single @@ -333,6 +335,8 @@ struct ShredInsertionTracker<'a> { index_meta_time_us: u64, // Collection of recently completed data sets (data portion of erasure batch) newly_completed_data_sets: Vec, + // Collection of shred events to be sent to the shred resolver service + shred_events: Vec, } impl ShredInsertionTracker<'_> { @@ -347,6 +351,7 @@ impl ShredInsertionTracker<'_> { write_batch, index_meta_time_us: 0, newly_completed_data_sets: vec![], + shred_events: vec![], } } } @@ -1260,10 +1265,6 @@ impl Blockstore { continue; } let (slot, _) = erasure_set.store_key(); - if self.has_duplicate_shreds_in_slot(slot) { - // TODO(ashwin): continue checking to notify the shred resolver - continue; - } // First coding shred from this erasure batch, check the forward merkle root chaining let erasure_meta = working_erasure_meta.as_ref(); let shred_id = ShredId::new( @@ -1285,6 +1286,7 @@ impl Blockstore { &shred_insertion_tracker.just_inserted_shreds, &shred_insertion_tracker.merkle_root_metas, &mut shred_insertion_tracker.duplicate_shreds, + &mut shred_insertion_tracker.shred_events, ); } @@ -1296,10 +1298,6 @@ impl Blockstore { continue; } let (slot, _) = erasure_set.store_key(); - if self.has_duplicate_shreds_in_slot(slot) { - // TODO(ashwin): continue checking to notify the shred resolver - continue; - } // First shred from this erasure batch, check the backwards merkle root chaining let merkle_root_meta = working_merkle_root_meta.as_ref(); let shred_id = ShredId::new( @@ -1318,6 +1316,7 @@ impl Blockstore { &shred_insertion_tracker.just_inserted_shreds, &shred_insertion_tracker.erasure_metas, &mut shred_insertion_tracker.duplicate_shreds, + &mut shred_insertion_tracker.shred_events, ); } } @@ -1519,6 +1518,7 @@ impl Blockstore { Ok(InsertResults { completed_data_set_infos: shred_insertion_tracker.newly_completed_data_sets, duplicate_shreds: shred_insertion_tracker.duplicate_shreds, + shred_events: shred_insertion_tracker.shred_events, }) } @@ -1548,6 +1548,7 @@ impl Blockstore { is_trusted, retransmit_sender, handle_duplicate, + None, reed_solomon_cache, metrics, ) @@ -1568,6 +1569,7 @@ impl Blockstore { is_trusted: bool, retransmit_sender: &EvictingSender>, handle_duplicate: &F, + shred_event_sender: Option<&ShredEventSender>, reed_solomon_cache: &ReedSolomonCache, metrics: &mut BlockstoreInsertionMetrics, ) -> Result> @@ -1577,6 +1579,7 @@ impl Blockstore { let InsertResults { completed_data_set_infos, duplicate_shreds, + shred_events, } = self.do_insert_shreds( shreds, leader_schedule, @@ -1589,6 +1592,13 @@ impl Blockstore { handle_duplicate(shred); } + if let Some(shred_event_sender) = shred_event_sender { + for event in shred_events { + // TODO: handle error + let _ = shred_event_sender.send(event); + } + } + Ok(completed_data_set_infos) } @@ -1755,6 +1765,7 @@ impl Blockstore { merkle_root_meta.as_ref(), &shred, duplicate_shreds, + &mut shred_insertion_tracker.shred_events, ) { return false; } @@ -1838,9 +1849,21 @@ impl Blockstore { index_meta_working_set_entry.did_insert_occur = true; metrics.num_inserted += 1; - merkle_root_metas - .entry((location, erasure_set)) - .or_insert(WorkingEntry::Dirty(MerkleRootMeta::from_shred(&shred))); + if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry((location, erasure_set)) { + // This is the first shred from this FEC set, db was checked above, create a meta + let merkle_root_meta = MerkleRootMeta::from_shred(&shred); + entry.insert(WorkingEntry::Dirty(merkle_root_meta)); + + shred_insertion_tracker + .shred_events + .push(ShredEvent::NewFECSet { + location, + slot: shred.slot(), + fec_set_index: erasure_set.fec_set_index(), + merkle_root: merkle_root_meta.merkle_root().unwrap(), + chained_merkle_root: shred.chained_merkle_root().unwrap(), + }); + } } if let HashMapEntry::Vacant(entry) = just_inserted_shreds.entry((location, shred.id())) { @@ -1851,6 +1874,25 @@ impl Blockstore { result } + /// For a complete fec set `erasure_set`, check if it is the last set in the slot. + /// Note: assumes that the fec set is complete (all data shreds are ingested) + fn is_complete_fec_set_last_in_slot( + &self, + erasure_set: ErasureSetId, + location: BlockLocation, + just_inserted_shreds: &HashMap<(BlockLocation, ShredId), Cow<'_, Shred>>, + ) -> bool { + let last_data_shred_index = shred::last_data_shred_index(erasure_set.fec_set_index()); + let shred_id = ShredId::new(erasure_set.slot(), last_data_shred_index, ShredType::Data); + + let shred = self + .get_shred_from_just_inserted_or_db(just_inserted_shreds, shred_id, location) + .expect("FEC set must be complete to check last in slot"); + shred::layout::get_flags(&shred) + .expect("Previously inserted shred must be valid") + .contains(ShredFlags::LAST_SHRED_IN_SLOT) + } + fn find_conflicting_coding_shred<'a>( &'a self, shred: &Shred, @@ -1940,6 +1982,7 @@ impl Blockstore { erasure_metas, write_batch, newly_completed_data_sets, + .. } = shred_insertion_tracker; let index_meta_working_set_entry = @@ -1956,6 +1999,7 @@ impl Blockstore { let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); let erasure_set = shred.erasure_set(); + let fec_set_index = erasure_set.fec_set_index(); if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry((location, erasure_set)) { if let Some(meta) = self .merkle_root_meta_from_location(erasure_set, location) @@ -2015,6 +2059,7 @@ impl Blockstore { merkle_root_meta.as_ref(), &shred, duplicate_shreds, + &mut shred_insertion_tracker.shred_events, ) { // This indicates there is an alternate version of this block. // Similar to the last index case above, we might never get all the @@ -2030,25 +2075,55 @@ impl Blockstore { } } - let completed_data_sets = self.insert_data_shred( - slot_meta, - index_meta.data_mut(), - &shred, - location, - write_batch, - shred_source, - )?; - if matches!(location, BlockLocation::Original) { - // We don't currently notify RPC when we complete data sets in alternate columns. This can be extended in the future - // if necessary. - newly_completed_data_sets.extend(completed_data_sets); - } - merkle_root_metas - .entry((location, erasure_set)) - .or_insert(WorkingEntry::Dirty(MerkleRootMeta::from_shred(&shred))); + { + let completed_data_sets = self.insert_data_shred( + slot_meta, + index_meta.data_mut(), + &shred, + location, + write_batch, + shred_source, + )?; + + if matches!(location, BlockLocation::Original) { + // We don't currently notify RPC when we complete data sets in alternate columns. This can be extended in the future + // if necessary. + newly_completed_data_sets.extend(completed_data_sets); + } + } + + if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry((location, erasure_set)) { + // This is the first shred from this FEC set, db was checked above, create a meta + let merkle_root_meta = MerkleRootMeta::from_shred(&shred); + entry.insert(WorkingEntry::Dirty(merkle_root_meta)); + + shred_insertion_tracker + .shred_events + .push(ShredEvent::NewFECSet { + location, + slot, + fec_set_index, + merkle_root: merkle_root_meta.merkle_root().unwrap(), + chained_merkle_root: shred.chained_merkle_root().unwrap(), + }); + } just_inserted_shreds.insert((location, shred.id()), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; + + if Index::is_data_set_complete(fec_set_index, index_meta) { + let is_last_in_slot = + self.is_complete_fec_set_last_in_slot(erasure_set, location, just_inserted_shreds); + shred_insertion_tracker + .shred_events + .push(ShredEvent::CompletedFECSet { + location, + slot, + fec_set_index, + is_last_in_slot, + }); + } + if let BTreeMapEntry::Vacant(entry) = erasure_metas.entry((location, erasure_set)) { if let Some(meta) = self .erasure_meta_from_location(erasure_set, location) @@ -2137,6 +2212,7 @@ impl Blockstore { merkle_root_meta: &MerkleRootMeta, shred: &Shred, duplicate_shreds: &mut Vec, + shred_events: &mut Vec, ) -> bool { let new_merkle_root = shred.merkle_root().ok(); if merkle_root_meta.merkle_root() == new_merkle_root { @@ -2157,6 +2233,14 @@ impl Blockstore { shred.shred_type(), ); + shred_events.push(ShredEvent::MerkleRootConflict { + location, + slot, + fec_set_index: shred.erasure_set().fec_set_index(), + conflicting_shred_index: shred.index(), + conflicting_shred_type: shred.shred_type(), + }); + if !self.has_duplicate_shreds_in_slot(slot) { let shred_id = ShredId::new( slot, @@ -2210,6 +2294,7 @@ impl Blockstore { just_inserted_shreds: &HashMap<(BlockLocation, ShredId), Cow<'_, Shred>>, merkle_root_metas: &HashMap<(BlockLocation, ErasureSetId), WorkingEntry>, duplicate_shreds: &mut Vec, + shred_events: &mut Vec, ) -> bool { debug_assert!(erasure_meta.check_coding_shred(shred)); let slot = shred.slot(); @@ -2267,6 +2352,14 @@ impl Blockstore { next_merkle_root_meta.first_received_shred_type(), ); + shred_events.push(ShredEvent::ChainedMerkleRootConflict { + location, + slot, + fec_set_index: erasure_set.fec_set_index(), + merkle_root: merkle_root.unwrap_or_default(), + chained_merkle_root: chained_merkle_root.unwrap_or_default(), + }); + if !self.has_duplicate_shreds_in_slot(shred.slot()) { duplicate_shreds.push(PossibleDuplicateShred::ChainedMerkleRootConflict( shred.clone(), @@ -2295,16 +2388,16 @@ impl Blockstore { just_inserted_shreds: &HashMap<(BlockLocation, ShredId), Cow<'_, Shred>>, erasure_metas: &BTreeMap<(BlockLocation, ErasureSetId), WorkingEntry>, duplicate_shreds: &mut Vec, + shred_events: &mut Vec, ) -> bool { let slot = shred.slot(); let erasure_set = shred.erasure_set(); let fec_set_index = shred.fec_set_index(); if fec_set_index == 0 { - // Although the first fec set chains to the last fec set of the parent block, - // if this chain is incorrect we do not know which block is the duplicate until votes - // are received. We instead delay this check until the block reaches duplicate - // confirmation. + // The first fec set chains to the last fec set of the parent block. + // This chaining across block boundary is checked in the shred resolver and used + // in replay return true; } @@ -2356,6 +2449,14 @@ impl Blockstore { shred.shred_type(), ); + shred_events.push(ShredEvent::ChainedMerkleRootConflict { + location, + slot, + fec_set_index: prev_erasure_set.fec_set_index(), + merkle_root: merkle_root.unwrap_or_default(), + chained_merkle_root: chained_merkle_root.unwrap_or_default(), + }); + if !self.has_duplicate_shreds_in_slot(shred.slot()) { duplicate_shreds.push(PossibleDuplicateShred::ChainedMerkleRootConflict( shred.clone(), @@ -8131,7 +8232,12 @@ pub mod tests { let slot = 1; let (_data_shreds, code_shreds, _) = setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( - slot, 0, 10, 0, None, true, + slot, + 0, + 10, + 0, + Some(Hash::default()), + true, ); let coding_shred = code_shreds[0].clone(); @@ -8170,7 +8276,12 @@ pub mod tests { let slot = 1; let (_data_shreds, code_shreds, _) = setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( - slot, 0, 10, 0, None, true, + slot, + 0, + 10, + 0, + Some(Hash::default()), + true, ); let coding_shred = code_shreds[0].clone(); @@ -12192,7 +12303,7 @@ pub mod tests { parent_slot, 10, fec_set_index, - None, + Some(Hash::default()), false, ); let merkle_root = first_data_shreds[0].merkle_root().unwrap(); @@ -12233,7 +12344,7 @@ pub mod tests { parent_slot, 100, fec_set_index, - None, + Some(Hash::default()), false, ); let merkle_root = first_data_shreds[0].merkle_root().unwrap(); @@ -12265,28 +12376,6 @@ pub mod tests { let results = blockstore.check_last_fec_set(slot).unwrap(); assert_eq!(results.last_fec_set_merkle_root, Some(merkle_root)); assert!(!results.is_retransmitter_signed); - blockstore.run_purge(slot, slot, PurgeType::Exact).unwrap(); - - // Slot is full, but does not contain retransmitter shreds - let fec_set_index = 0; - let (first_data_shreds, _, _) = - setup_erasure_shreds_with_index_and_chained_merkle_and_last_in_slot( - slot, - parent_slot, - 200, - fec_set_index, - // Do not set merkle root, so shreds are not signed - None, - true, - ); - assert!(first_data_shreds.len() > DATA_SHREDS_PER_FEC_BLOCK); - let block_id = first_data_shreds.last().unwrap().merkle_root().unwrap(); - blockstore - .insert_shreds(first_data_shreds, None, false) - .unwrap(); - let results = blockstore.check_last_fec_set(slot).unwrap(); - assert_eq!(results.last_fec_set_merkle_root, Some(block_id)); - assert!(!results.is_retransmitter_signed); } #[test] diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index cd24f843f1..296d8ed171 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -2,7 +2,7 @@ use { crate::{ bit_vec::BitVec, blockstore::BlockstoreError, - shred::{self, Shred, ShredType, MAX_DATA_SHREDS_PER_SLOT}, + shred::{self, Shred, ShredType, DATA_SHREDS_PER_FEC_BLOCK, MAX_DATA_SHREDS_PER_SLOT}, }, bincode::Options, bitflags::bitflags, @@ -514,6 +514,13 @@ impl Index { pub(crate) fn coding_mut(&mut self) -> &mut ShredIndex { &mut self.coding } + + /// Checks the index to see if we have all the data shreds in this FEC set + pub(crate) fn is_data_set_complete(fec_set_index: u32, index: &Index) -> bool { + let data_indices = + (fec_set_index as usize)..(fec_set_index as usize + DATA_SHREDS_PER_FEC_BLOCK); + index.data().count_ones_in_range(data_indices) == DATA_SHREDS_PER_FEC_BLOCK + } } #[cfg(test)] @@ -631,6 +638,13 @@ impl ShredIndexV2 { .map(|idx| idx as u64) } + pub(crate) fn count_ones_in_range(&self, range: R) -> usize + where + R: RangeBounds, + { + self.index.range(range).count_ones() + } + fn iter(&self) -> impl Iterator + '_ { self.range(0..MAX_DATA_SHREDS_PER_SLOT as u64) } diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index cc492f7917..6d06ca6bd7 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -28,6 +28,7 @@ pub mod leader_schedule_utils; pub mod next_slots_iterator; pub mod rooted_slot_iterator; conditional_mod::conditional_vis_mod!(shred, feature="agave-unstable-api", pub,pub(crate)); +conditional_mod::conditional_vis_mod!(shred_event, feature="agave-unstable-api", pub,pub(crate)); mod shredder; pub mod sigverify_shreds; pub mod slot_stats; diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index a0e1ffa9c8..bb0c8a335d 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -330,6 +330,10 @@ impl ErasureSetId { self.0 } + pub(crate) fn fec_set_index(&self) -> u32 { + self.1 + } + // Storage key for ErasureMeta and MerkleRootMeta in blockstore db. // Note: ErasureMeta column uses u64 so this will need to be typecast pub(crate) fn store_key(&self) -> (Slot, /*fec_set_index:*/ u32) { @@ -833,6 +837,11 @@ where false } +/// Returns the index of the last data shred in this FEC set +pub(crate) fn last_data_shred_index(fec_set_index: u32) -> u32 { + fec_set_index + (DATA_SHREDS_PER_FEC_BLOCK as u32) - 1 +} + pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { let ticks = create_ticks(1, 0, Hash::default()); max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size) diff --git a/ledger/src/shred_event.rs b/ledger/src/shred_event.rs new file mode 100644 index 0000000000..15db1ff2b8 --- /dev/null +++ b/ledger/src/shred_event.rs @@ -0,0 +1,55 @@ +//! Events that can be sent from the blockstore ingest thread +//! to the shred resolver service, for informing logic that decides +//! to dump or repair shreds. + +use { + crate::{blockstore_meta::BlockLocation, shred::ShredType}, + crossbeam_channel::{Receiver, Sender}, + solana_clock::Slot, + solana_hash::Hash, +}; + +pub type ShredEventSender = Sender; +pub type ShredEventReceiver = Receiver; + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum ShredEvent { + /// The FEC set at `(slot, fec_set_index)` in blockstore location `location` has received all data shreds + CompletedFECSet { + location: BlockLocation, + slot: Slot, + fec_set_index: u32, + is_last_in_slot: bool, + }, + + /// We have observed a data or coding shred from `(slot, fec_set_index)` in blockstore location `location`, + /// for the first time. + NewFECSet { + location: BlockLocation, + slot: Slot, + fec_set_index: u32, + merkle_root: Hash, + chained_merkle_root: Hash, + }, + + /// We have observed conflicting shreds in `(slot, fec_set_index)` in blockstore location `location`. + /// The conflicting shred is of type `conflicting_shred_type` for index `conflicting_shred_index`. + MerkleRootConflict { + location: BlockLocation, + slot: Slot, + fec_set_index: u32, + conflicting_shred_index: u32, + conflicting_shred_type: ShredType, + }, + + /// We have observed incorrectly chained shreds in `slot` across two fec sets. + /// The `merkle_root` of `fec_set_index` does not match the `chained_merkle_root` + /// of the next fec set + ChainedMerkleRootConflict { + location: BlockLocation, + slot: Slot, + fec_set_index: u32, + merkle_root: Hash, + chained_merkle_root: Hash, + }, +}