Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ impl Blockstore {
if let Some(shred) = prev_inserted_shreds.get(&key) {
return Some(shred.clone());
}
if !index.data().is_present(i) {
if !index.data().contains(i) {
return None;
}
match data_cf.get_bytes((slot, i)).unwrap() {
Expand All @@ -662,7 +662,7 @@ impl Blockstore {
if let Some(shred) = prev_inserted_shreds.get(&key) {
return Some(shred.clone());
}
if !index.coding().is_present(i) {
if !index.coding().contains(i) {
return None;
}
match code_cf.get_bytes((slot, i)).unwrap() {
Expand Down Expand Up @@ -1051,7 +1051,7 @@ impl Blockstore {
// So, all coding shreds in a given FEC block will have the same set index

if !is_trusted {
if index_meta.coding().is_present(shred_index) {
if index_meta.coding().contains(shred_index) {
metrics.num_coding_shreds_exists += 1;
handle_duplicate(shred);
return false;
Expand Down Expand Up @@ -1260,15 +1260,15 @@ impl Blockstore {
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.payload)?;
index_meta.coding_mut().set_present(shred_index, true);
index_meta.coding_mut().insert(shred_index);

Ok(())
}

fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool {
let shred_index = u64::from(shred.index());
// Check that the shred doesn't already exist in blockstore
shred_index < slot_meta.consumed || data_index.is_present(shred_index)
shred_index < slot_meta.consumed || data_index.contains(shred_index)
}

fn get_data_shred_from_just_inserted_or_db<'a>(
Expand Down Expand Up @@ -1454,7 +1454,7 @@ impl Blockstore {
let new_consumed = if slot_meta.consumed == index {
let mut current_index = index + 1;

while data_index.is_present(current_index) {
while data_index.contains(current_index) {
current_index += 1;
}
current_index
Expand All @@ -1470,7 +1470,7 @@ impl Blockstore {
// But only need to store the bytes within data_header.size
&shred.payload[..shred.data_header.size as usize],
)?;
data_index.set_present(index, true);
data_index.insert(index);
let newly_completed_data_sets = update_slot_meta(
last_in_slot,
last_in_data,
Expand Down Expand Up @@ -3164,7 +3164,7 @@ fn update_completed_data_indexes(
.filter(|ix| {
let (begin, end) = (ix[0] as u64, ix[1] as u64);
let num_shreds = (end - begin) as usize;
received_data_shreds.present_in_bounds(begin..end) == num_shreds
received_data_shreds.range(begin..end).count() == num_shreds
})
.map(|ix| (ix[0], ix[1] - 1))
.collect()
Expand Down Expand Up @@ -8040,7 +8040,7 @@ pub mod tests {
// Test that iterator and individual shred lookup yield same set
assert!(blockstore.get_data_shred(slot, index).unwrap().is_some());
// Test that the data index has current shred accounted for
assert!(shred_index.data().is_present(index));
assert!(shred_index.data().contains(index));
}

// Test the data index doesn't have anything extra
Expand All @@ -8054,7 +8054,7 @@ pub mod tests {
// Test that the iterator and individual shred lookup yield same set
assert!(blockstore.get_coding_shred(slot, index).unwrap().is_some());
// Test that the coding index has current shred accounted for
assert!(shred_index.coding().is_present(index));
assert!(shred_index.coding().contains(index));
}

// Test the data index doesn't have anything extra
Expand Down Expand Up @@ -8164,7 +8164,7 @@ pub mod tests {
let mut shred_index = ShredIndex::default();

for i in 0..10 {
shred_index.set_present(i as u64, true);
shred_index.insert(i as u64);
assert_eq!(
update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes),
vec![(i, i)]
Expand All @@ -8178,21 +8178,21 @@ pub mod tests {
let mut completed_data_indexes = BTreeSet::default();
let mut shred_index = ShredIndex::default();

shred_index.set_present(4, true);
shred_index.insert(4);
assert!(
update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes)
.is_empty()
);
assert!(completed_data_indexes.is_empty());

shred_index.set_present(2, true);
shred_index.insert(2);
assert!(
update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes)
.is_empty()
);
assert!(completed_data_indexes.is_empty());

shred_index.set_present(3, true);
shred_index.insert(3);
assert!(
update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes)
.is_empty()
Expand All @@ -8201,7 +8201,7 @@ pub mod tests {

// Inserting data complete shred 1 now confirms the range of shreds [2, 3]
// is part of the same data set
shred_index.set_present(1, true);
shred_index.insert(1);
assert_eq!(
update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes),
vec![(2, 3)]
Expand All @@ -8210,7 +8210,7 @@ pub mod tests {

// Inserting data complete shred 0 now confirms the range of shreds [0]
// is part of the same data set
shred_index.set_present(0, true);
shred_index.insert(0);
assert_eq!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes),
vec![(0, 0), (1, 1)]
Expand Down Expand Up @@ -8510,7 +8510,7 @@ pub mod tests {
assert_eq!(meta.consumed, shreds.len() as u64);
let shreds_index = blockstore.get_index(slot).unwrap().unwrap();
for i in 0..shreds.len() as u64 {
assert!(shreds_index.data().is_present(i));
assert!(shreds_index.data().contains(i));
}

// Cleanup the slot
Expand Down
60 changes: 23 additions & 37 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ impl Index {
&self.coding
}

pub fn data_mut(&mut self) -> &mut ShredIndex {
pub(crate) fn data_mut(&mut self) -> &mut ShredIndex {
&mut self.data
}
pub fn coding_mut(&mut self) -> &mut ShredIndex {
pub(crate) fn coding_mut(&mut self) -> &mut ShredIndex {
&mut self.coding
}
}
Expand All @@ -164,30 +164,19 @@ impl ShredIndex {
self.index.len()
}

pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index.range(bounds).count()
pub(crate) fn range<R>(&self, bounds: R) -> impl Iterator<Item = &u64>
where
R: RangeBounds<u64>,
{
self.index.range(bounds)
}

pub fn is_present(&self, index: u64) -> bool {
pub(crate) fn contains(&self, index: u64) -> bool {
self.index.contains(&index)
}

pub fn set_present(&mut self, index: u64, presence: bool) {
if presence {
self.index.insert(index);
} else {
self.index.remove(&index);
}
}

pub fn set_many_present(&mut self, presence: impl IntoIterator<Item = (u64, bool)>) {
for (idx, present) in presence.into_iter() {
self.set_present(idx, present);
}
}

pub fn largest(&self) -> Option<u64> {
self.index.iter().rev().next().copied()
pub(crate) fn insert(&mut self, index: u64) {
self.index.insert(index);
}
}

Expand Down Expand Up @@ -301,10 +290,8 @@ impl ErasureMeta {
pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus {
use ErasureMetaStatus::*;

let num_coding = index
.coding()
.present_in_bounds(self.coding_shreds_indices());
let num_data = index.data().present_in_bounds(self.data_shreds_indices());
let num_coding = index.coding().range(self.coding_shreds_indices()).count();
let num_data = index.data().range(self.data_shreds_indices()).count();

let (data_missing, num_needed) = (
self.config.num_data().saturating_sub(num_data),
Expand Down Expand Up @@ -355,7 +342,6 @@ mod test {
use {
super::*,
rand::{seq::SliceRandom, thread_rng},
std::iter::repeat,
};

#[test]
Expand All @@ -379,35 +365,35 @@ mod test {

assert_eq!(e_meta.status(&index), StillNeed(erasure_config.num_data()));

index
.data_mut()
.set_many_present(data_indexes.clone().zip(repeat(true)));
for ix in data_indexes.clone() {
index.data_mut().insert(ix);
}

assert_eq!(e_meta.status(&index), DataFull);

index
.coding_mut()
.set_many_present(coding_indexes.clone().zip(repeat(true)));
for ix in coding_indexes.clone() {
index.coding_mut().insert(ix);
}

for &idx in data_indexes
.clone()
.collect::<Vec<_>>()
.choose_multiple(&mut rng, erasure_config.num_data())
{
index.data_mut().set_present(idx, false);
index.data_mut().index.remove(&idx);

assert_eq!(e_meta.status(&index), CanRecover);
}

index
.data_mut()
.set_many_present(data_indexes.zip(repeat(true)));
for ix in data_indexes {
index.data_mut().insert(ix);
}

for &idx in coding_indexes
.collect::<Vec<_>>()
.choose_multiple(&mut rng, erasure_config.num_coding())
{
index.coding_mut().set_present(idx, false);
index.coding_mut().index.remove(&idx);

assert_eq!(e_meta.status(&index), DataFull);
}
Expand Down