Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use streamer::BlobSender;

pub const MAX_REPAIR_LENGTH: usize = 128;

pub fn repair(
slot: u64,
db_ledger: &DbLedger,
Expand Down Expand Up @@ -79,7 +81,13 @@ pub fn repair(
max_entry_height + 2
};

let idxs = find_missing_data_indexes(slot, db_ledger, consumed, max_repair_entry_height - 1);
let idxs = find_missing_data_indexes(
slot,
db_ledger,
consumed,
max_repair_entry_height - 1,
MAX_REPAIR_LENGTH,
);

let reqs: Vec<_> = idxs
.into_iter()
Expand Down Expand Up @@ -117,8 +125,9 @@ pub fn find_missing_indexes(
end_index: u64,
key: &Fn(u64, u64) -> Vec<u8>,
index_from_key: &Fn(&[u8]) -> Result<u64>,
max_missing: usize,
) -> Vec<u64> {
if start_index >= end_index {
if start_index >= end_index || max_missing == 0 {
return vec![];
}

Expand All @@ -129,7 +138,7 @@ pub fn find_missing_indexes(

// The index of the first missing blob in the slot
let mut prev_index = start_index;
loop {
'outer: loop {
if !db_iterator.valid() {
break;
}
Expand All @@ -139,6 +148,9 @@ pub fn find_missing_indexes(
let upper_index = cmp::min(current_index, end_index);
for i in prev_index..upper_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
break 'outer;
}
}
if current_index >= end_index {
break;
Expand All @@ -156,6 +168,7 @@ pub fn find_missing_data_indexes(
db_ledger: &DbLedger,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
let mut db_iterator = db_ledger
.db
Expand All @@ -169,6 +182,7 @@ pub fn find_missing_data_indexes(
end_index,
&DataCf::key,
&DataCf::index_from_key,
max_missing,
)
}

Expand All @@ -177,6 +191,7 @@ pub fn find_missing_coding_indexes(
db_ledger: &DbLedger,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
let mut db_iterator = db_ledger
.db
Expand All @@ -190,6 +205,7 @@ pub fn find_missing_coding_indexes(
end_index,
&ErasureCf::key,
&ErasureCf::index_from_key,
max_missing,
)
}

Expand Down Expand Up @@ -463,9 +479,10 @@ mod test {

// Early exit conditions
let empty: Vec<u64> = vec![];
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 0, 0), empty);
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 5, 5), empty);
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 4, 3), empty);
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 0, 0, 1), empty);
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 5, 5, 1), empty);
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 4, 3, 1), empty);
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 1, 2, 0), empty);

let shared_blob = &make_tiny_test_entries(1).to_blobs()[0];
let first_index = 10;
Expand All @@ -483,7 +500,13 @@ mod test {
// given the input range of [i, first_index], the missing indexes should be
// [i, first_index - 1]
for i in 0..first_index {
let result = find_missing_data_indexes(slot, &db_ledger, i, first_index);
let result = find_missing_data_indexes(
slot,
&db_ledger,
i,
first_index,
(first_index - i) as usize,
);
let expected: Vec<u64> = (i..first_index).collect();

assert_eq!(result, expected);
Expand All @@ -503,6 +526,7 @@ mod test {

// Write entries
let gap = 10;
assert!(gap > 3);
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
for (b, i) in shared_blobs.iter().zip(0..shared_blobs.len() as u64) {
Expand All @@ -518,17 +542,29 @@ mod test {
// range of [0, gap)
let expected: Vec<u64> = (1..gap).collect();
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, 0, gap),
find_missing_data_indexes(slot, &db_ledger, 0, gap, gap as usize),
expected
);
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, 1, gap),
find_missing_data_indexes(slot, &db_ledger, 1, gap, (gap - 1) as usize),
expected,
);
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, 0, gap - 1),
find_missing_data_indexes(slot, &db_ledger, 0, gap - 1, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, gap - 2, gap, gap as usize),
vec![gap - 2, gap - 1],
);
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, gap - 2, gap, 1),
vec![gap - 2],
);
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, 0, gap, 1),
vec![1],
);

for i in 0..num_entries as u64 {
for j in 0..i {
Expand All @@ -539,7 +575,13 @@ mod test {
(begin..end)
}).collect();
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, j * gap, i * gap),
find_missing_data_indexes(
slot,
&db_ledger,
j * gap,
i * gap,
((i - j) * gap) as usize
),
expected,
);
}
Expand Down Expand Up @@ -567,7 +609,10 @@ mod test {
let empty: Vec<u64> = vec![];
for i in 0..num_entries as u64 {
for j in 0..i {
assert_eq!(find_missing_data_indexes(slot, &db_ledger, j, i), empty);
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, j, i, (i - j) as usize),
empty
);
}
}

Expand Down