Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use {
solana_ledger::{
blockstore::Blockstore,
leader_schedule_cache::LeaderScheduleCache,
shred::{Shred, ShredType},
shred::{Shred, ShredId},
},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
Expand Down Expand Up @@ -140,13 +140,13 @@ impl RetransmitStats {
}

// Map of shred (slot, index, type) => list of hash values seen for that key.
type ShredFilter = LruCache<(Slot, u32, ShredType), Vec<u64>>;
type ShredFilter = LruCache<ShredId, Vec<u64>>;

type ShredFilterAndHasher = (ShredFilter, PacketHasher);

// Returns true if shred is already received and should skip retransmit.
fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex<ShredFilterAndHasher>) -> bool {
let key = (shred.slot(), shred.index(), shred.shred_type());
let key = shred.id();
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
match cache.get_mut(&key) {
Expand Down
9 changes: 3 additions & 6 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,9 @@ fn run_check_duplicate(
let check_duplicate = |shred: Shred| -> Result<()> {
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(
shred_slot,
shred.index(),
shred.payload.clone(),
shred.shred_type(),
) {
if let Some(existing_shred_payload) =
blockstore.is_shred_duplicate(shred.id(), shred.payload.clone())
{
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred_slot,
Expand Down
126 changes: 60 additions & 66 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
entry::{create_ticks, Entry},
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{Result as ShredResult, Shred, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
shred::{Result as ShredResult, Shred, ShredId, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
},
bincode::deserialize,
log::*,
Expand Down Expand Up @@ -635,12 +635,13 @@ impl Blockstore {
index: &'a Index,
slot: Slot,
erasure_meta: &'a ErasureMeta,
prev_inserted_datas: &'a mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
data_cf: &'a LedgerColumn<cf::ShredData>,
) -> impl Iterator<Item = Shred> + 'a {
erasure_meta.data_shreds_indices().filter_map(move |i| {
if let Some(shred) = prev_inserted_datas.remove(&(slot, i)) {
return Some(shred);
let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Data);
if let Some(shred) = prev_inserted_shreds.get(&key) {
return Some(shred.clone());
}
if !index.data().is_present(i) {
return None;
Expand All @@ -656,14 +657,15 @@ impl Blockstore {
}

fn get_recovery_coding_shreds<'a>(
index: &'a mut Index,
index: &'a Index,
slot: Slot,
erasure_meta: &'a ErasureMeta,
prev_inserted_codes: &'a HashMap<(Slot, /*shred index:*/ u64), Shred>,
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
code_cf: &'a LedgerColumn<cf::ShredCode>,
) -> impl Iterator<Item = Shred> + 'a {
erasure_meta.coding_shreds_indices().filter_map(move |i| {
if let Some(shred) = prev_inserted_codes.get(&(slot, i)) {
let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Code);
if let Some(shred) = prev_inserted_shreds.get(&key) {
return Some(shred.clone());
}
if !index.coding().is_present(i) {
Expand All @@ -682,24 +684,28 @@ impl Blockstore {
fn recover_shreds(
index: &mut Index,
erasure_meta: &ErasureMeta,
prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
prev_inserted_codes: &HashMap<(Slot, /*shred index:*/ u64), Shred>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
recovered_data_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
let mut available_shreds: Vec<_> =
Self::get_recovery_data_shreds(index, slot, erasure_meta, prev_inserted_datas, data_cf)
.collect();
available_shreds.extend(Self::get_recovery_coding_shreds(
let available_shreds: Vec<_> = Self::get_recovery_data_shreds(
index,
slot,
erasure_meta,
prev_inserted_shreds,
data_cf,
)
.chain(Self::get_recovery_coding_shreds(
index,
slot,
erasure_meta,
prev_inserted_codes,
prev_inserted_shreds,
code_cf,
));
))
.collect();
if let Ok(mut result) = Shredder::try_recovery(available_shreds) {
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
recovered_data_shreds.append(&mut result);
Expand Down Expand Up @@ -733,8 +739,7 @@ impl Blockstore {
db: &Database,
erasure_metas: &HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_datas: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
prev_inserted_codes: &HashMap<(Slot, /*shred index:*/ u64), Shred>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
Expand All @@ -752,8 +757,7 @@ impl Blockstore {
Self::recover_shreds(
index,
erasure_meta,
prev_inserted_datas,
prev_inserted_codes,
prev_inserted_shreds,
&mut recovered_data_shreds,
&data_cf,
&code_cf,
Expand Down Expand Up @@ -799,8 +803,7 @@ impl Blockstore {
let db = &*self.db;
let mut write_batch = db.batch()?;

let mut just_inserted_coding_shreds = HashMap::new();
let mut just_inserted_data_shreds = HashMap::new();
let mut just_inserted_shreds = HashMap::with_capacity(shreds.len());
let mut erasure_metas = HashMap::new();
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
Expand All @@ -824,7 +827,7 @@ impl Blockstore {
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut just_inserted_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
Expand All @@ -851,7 +854,7 @@ impl Blockstore {
&mut erasure_metas,
&mut index_working_set,
&mut write_batch,
&mut just_inserted_coding_shreds,
&mut just_inserted_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
Expand All @@ -870,8 +873,7 @@ impl Blockstore {
db,
&erasure_metas,
&mut index_working_set,
&mut just_inserted_data_shreds,
&just_inserted_coding_shreds,
&just_inserted_shreds,
);

metrics.num_recovered += recovered_data_shreds.len();
Expand All @@ -890,7 +892,7 @@ impl Blockstore {
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut just_inserted_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
Expand Down Expand Up @@ -1017,6 +1019,8 @@ impl Blockstore {
}

fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool {
// TODO should also compare first-coding-index once position field is
// populated across cluster.
shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds
|| shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds
}
Expand All @@ -1028,7 +1032,7 @@ impl Blockstore {
erasure_metas: &mut HashMap<(Slot, /*fec set index:*/ u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
just_received_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time: &mut u64,
handle_duplicate: &F,
is_trusted: bool,
Expand Down Expand Up @@ -1077,7 +1081,7 @@ impl Blockstore {
&shred,
slot,
erasure_meta,
just_received_coding_shreds,
just_received_shreds,
);
if let Some(conflicting_shred) = conflicting_shred {
if self
Expand Down Expand Up @@ -1116,8 +1120,7 @@ impl Blockstore {
metrics.num_inserted += 1;
}

if let HashMapEntry::Vacant(entry) = just_received_coding_shreds.entry((slot, shred_index))
{
if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) {
metrics.num_coding_shreds_inserted += 1;
entry.insert(shred);
}
Expand All @@ -1130,30 +1133,27 @@ impl Blockstore {
shred: &Shred,
slot: Slot,
erasure_meta: &ErasureMeta,
just_received_coding_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
just_received_shreds: &HashMap<ShredId, Shred>,
) -> Option<Vec<u8>> {
// Search for the shred which set the initial erasure config, either inserted,
// or in the current batch in just_received_coding_shreds.
let mut conflicting_shred = None;
// or in the current batch in just_received_shreds.
for coding_index in erasure_meta.coding_shreds_indices() {
let maybe_shred = self.get_coding_shred(slot, coding_index);
if let Ok(Some(shred_data)) = maybe_shred {
let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap();
if Self::erasure_mismatch(&potential_shred, shred) {
conflicting_shred = Some(potential_shred.payload);
return Some(potential_shred.payload);
}
break;
} else if let Some(potential_shred) =
just_received_coding_shreds.get(&(slot, coding_index))
{
} else if let Some(potential_shred) = {
let key = ShredId::new(slot, u32::try_from(coding_index).unwrap(), ShredType::Code);
just_received_shreds.get(&key)
} {
if Self::erasure_mismatch(potential_shred, shred) {
conflicting_shred = Some(potential_shred.payload.clone());
return Some(potential_shred.payload.clone());
}
break;
}
}

conflicting_shred
None
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -1164,7 +1164,7 @@ impl Blockstore {
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(Slot, /*shred index:*/ u64), Shred>,
just_inserted_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time: &mut u64,
is_trusted: bool,
handle_duplicate: &F,
Expand Down Expand Up @@ -1215,7 +1215,7 @@ impl Blockstore {
if !self.should_insert_data_shred(
&shred,
slot_meta,
just_inserted_data_shreds,
just_inserted_shreds,
&self.last_root,
leader_schedule,
shred_source.clone(),
Expand All @@ -1232,7 +1232,7 @@ impl Blockstore {
write_batch,
shred_source,
)?;
just_inserted_data_shreds.insert((slot, shred_index), shred);
just_inserted_shreds.insert(shred.id(), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if let HashMapEntry::Vacant(entry) = erasure_metas.entry((slot, set_index)) {
Expand Down Expand Up @@ -1276,11 +1276,12 @@ impl Blockstore {

fn get_data_shred_from_just_inserted_or_db<'a>(
&'a self,
just_inserted_data_shreds: &'a HashMap<(Slot, /*shred index:*/ u64), Shred>,
just_inserted_shreds: &'a HashMap<ShredId, Shred>,
slot: Slot,
index: u64,
) -> Cow<'a, Vec<u8>> {
if let Some(shred) = just_inserted_data_shreds.get(&(slot, index)) {
let key = ShredId::new(slot, u32::try_from(index).unwrap(), ShredType::Data);
if let Some(shred) = just_inserted_shreds.get(&key) {
Cow::Borrowed(&shred.payload)
} else {
// If it doesn't exist in the just inserted set, it must exist in
Expand All @@ -1293,7 +1294,7 @@ impl Blockstore {
&self,
shred: &Shred,
slot_meta: &SlotMeta,
just_inserted_data_shreds: &HashMap<(Slot, /*shred index:*/ u64), Shred>,
just_inserted_shreds: &HashMap<ShredId, Shred>,
last_root: &RwLock<u64>,
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,
Expand Down Expand Up @@ -1350,7 +1351,7 @@ impl Blockstore {
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));

let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
just_inserted_data_shreds,
just_inserted_shreds,
slot,
last_index.unwrap(),
);
Expand Down Expand Up @@ -1386,7 +1387,7 @@ impl Blockstore {
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));

let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
just_inserted_data_shreds,
just_inserted_shreds,
slot,
slot_meta.received - 1,
);
Expand Down Expand Up @@ -2998,13 +2999,8 @@ impl Blockstore {
// Returns the existing shred if `new_shred` is not equal to the existing shred at the
// given slot and index as this implies the leader generated two different shreds with
// the same slot and index
pub fn is_shred_duplicate(
&self,
slot: u64,
index: u32,
mut payload: Vec<u8>,
shred_type: ShredType,
) -> Option<Vec<u8>> {
pub fn is_shred_duplicate(&self, shred: ShredId, mut payload: Vec<u8>) -> Option<Vec<u8>> {
let (slot, index, shred_type) = shred.unwrap();
let existing_shred = match shred_type {
ShredType::Data => self.get_data_shred(slot, index as u64),
ShredType::Code => self.get_coding_shred(slot, index as u64),
Expand Down Expand Up @@ -8175,19 +8171,15 @@ pub mod tests {
// Check if shreds are duplicated
assert_eq!(
blockstore.is_shred_duplicate(
slot,
0,
ShredId::new(slot, /*index:*/ 0, duplicate_shred.shred_type()),
duplicate_shred.payload.clone(),
duplicate_shred.shred_type()
),
Some(shred.payload.to_vec())
);
assert!(blockstore
.is_shred_duplicate(
slot,
0,
ShredId::new(slot, /*index:*/ 0, duplicate_shred.shred_type()),
non_duplicate_shred.payload,
duplicate_shred.shred_type()
)
.is_none());

Expand Down Expand Up @@ -8673,10 +8665,12 @@ pub mod tests {
std::u8::MAX - even_smaller_last_shred_duplicate.payload[0];
assert!(blockstore
.is_shred_duplicate(
slot,
even_smaller_last_shred_duplicate.index(),
ShredId::new(
slot,
even_smaller_last_shred_duplicate.index(),
ShredType::Data,
),
even_smaller_last_shred_duplicate.payload.clone(),
ShredType::Data,
)
.is_some());
blockstore
Expand Down
Loading