Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions src/db_ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ impl LedgerColumnFamilyRaw for DataCf {
pub struct ErasureCf {}

impl ErasureCf {
pub fn delete_by_slot_index(&self, db: &DB, slot_height: u64, index: u64) -> Result<()> {
let key = Self::key(slot_height, index);
self.delete(db, &key)
}

pub fn get_by_slot_index(
&self,
db: &DB,
Expand Down Expand Up @@ -270,34 +275,37 @@ impl DbLedger {
Ok(())
}

pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<()>
pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<Vec<Entry>>
where
I: IntoIterator,
I::Item: Borrow<SharedBlob>,
{
let mut entries = vec![];
for b in shared_blobs {
let bl = b.borrow().read().unwrap();
let index = bl.index()?;
let key = DataCf::key(slot, index);
self.insert_data_blob(&key, &*bl)?;
let new_entries = self.insert_data_blob(&key, &*bl)?;
entries.extend(new_entries);
}

Ok(())
Ok(entries)
}

pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<()>
pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<Vec<Entry>>
where
I: IntoIterator<Item = &'a &'a Blob>,
{
let mut entries = vec![];
for blob in blobs.into_iter() {
let index = blob.index()?;
let key = DataCf::key(slot, index);
self.insert_data_blob(&key, blob)?;
let new_entries = self.insert_data_blob(&key, blob)?;
entries.extend(new_entries);
}
Ok(())
Ok(entries)
}

pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<()>
pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<Vec<Entry>>
where
I: IntoIterator,
I::Item: Borrow<Entry>,
Expand Down
113 changes: 112 additions & 1 deletion src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use cluster_info::ClusterInfo;
use counter::Counter;
use db_ledger::*;
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use leader_scheduler::LeaderScheduler;
use log::Level;
use packet::{SharedBlob, BLOB_HEADER_SIZE};
Expand Down Expand Up @@ -140,6 +142,12 @@ pub fn find_missing_indexes(
let mut prev_index = start_index;
'outer: loop {
if !db_iterator.valid() {
for i in prev_index..end_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
break;
}
}
break;
}
let current_key = db_iterator.key().expect("Expect a valid key");
Expand Down Expand Up @@ -303,7 +311,19 @@ pub fn process_blob(
db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
};

// TODO: Once erasure is fixed, readd that logic here
#[cfg(feature = "erasure")]
{
// If write_shared_blobs() of these recovered blobs fails fails, don't return
// because consumed_entries might be nonempty from earlier, and tick height needs to
// be updated. Hopefully we can recover these blobs next time successfully.
if let Err(e) = try_erasure(db_ledger, slot, consume_queue) {
trace!(
"erasure::recover failed to write recovered coding blobs. Err: {:?}",
e
);
}
}

for entry in &consumed_entries {
*tick_height += entry.is_tick() as u64;
}
Expand Down Expand Up @@ -352,9 +372,39 @@ pub fn calculate_max_repair_entry_height(
}
}

#[cfg(feature = "erasure")]
fn try_erasure(db_ledger: &mut DbLedger, slot: u64, consume_queue: &mut Vec<Entry>) -> Result<()> {
let meta = db_ledger.meta_cf.get(&db_ledger.db, &MetaCf::key(slot))?;
if let Some(meta) = meta {
let (data, coding) = erasure::recover(db_ledger, slot, meta.consumed)?;
for c in coding {
let cl = c.read().unwrap();
let erasure_key =
ErasureCf::key(slot, cl.index().expect("Recovered blob must set index"));
let size = cl.size().expect("Recovered blob must set size");
db_ledger.erasure_cf.put(
&db_ledger.db,
&erasure_key,
&cl.data[..BLOB_HEADER_SIZE + size],
)?;
}

let entries = db_ledger.write_shared_blobs(slot, data)?;
consume_queue.extend(entries);
}

Ok(())
}

#[cfg(test)]
mod test {
use super::*;
#[cfg(all(feature = "erasure", test))]
use entry::reconstruct_entries_from_blobs;
#[cfg(all(feature = "erasure", test))]
use erasure::test::{generate_db_ledger_from_window, setup_window_ledger};
#[cfg(all(feature = "erasure", test))]
use erasure::{NUM_CODING, NUM_DATA};
use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use rocksdb::{Options, DB};
Expand Down Expand Up @@ -572,6 +622,18 @@ mod test {
vec![1],
);

// Test with end indexes that are greater than the last item in the ledger
let mut expected: Vec<u64> = (1..gap).collect();
expected.push(gap + 1);
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, 0, gap + 2, (gap + 2) as usize),
expected,
);
assert_eq!(
find_missing_data_indexes(slot, &db_ledger, 0, gap + 2, (gap - 1) as usize),
&expected[..expected.len() - 1],
);

for i in 0..num_entries as u64 {
for j in 0..i {
let expected: Vec<u64> = (j..i)
Expand Down Expand Up @@ -626,4 +688,53 @@ mod test {
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destruction");
}

#[cfg(all(feature = "erasure", test))]
#[test]
pub fn test_try_erasure() {
// Setup the window
let offset = 0;
let num_blobs = NUM_DATA + 2;
let slot_height = DEFAULT_SLOT_HEIGHT;
let mut window = setup_window_ledger(offset, num_blobs, false, slot_height);
let end_index = (offset + num_blobs) % window.len();

// Test erasing a data block and an erasure block
let coding_start = offset - (offset % NUM_DATA) + (NUM_DATA - NUM_CODING);

let erase_offset = coding_start % window.len();

// Create a hole in the window
let erased_data = window[erase_offset].data.clone();
let erased_coding = window[erase_offset].coding.clone().unwrap();
window[erase_offset].data = None;
window[erase_offset].coding = None;

// Generate the db_ledger from the window
let ledger_path = get_tmp_ledger_path("test_try_erasure");
let mut db_ledger =
generate_db_ledger_from_window(&ledger_path, &window, slot_height, false);

let mut consume_queue = vec![];
try_erasure(&mut db_ledger, slot_height, &mut consume_queue)
.expect("Expected successful erasure attempt");
window[erase_offset].data = erased_data;

let data_blobs: Vec<_> = window[erase_offset..end_index]
.iter()
.map(|slot| slot.data.clone().unwrap())
.collect();
let (expected, _) = reconstruct_entries_from_blobs(data_blobs).unwrap();
assert_eq!(consume_queue, expected);

let erased_coding_l = erased_coding.read().unwrap();
assert_eq!(
&db_ledger
.erasure_cf
.get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64)
.unwrap()
.unwrap()[BLOB_HEADER_SIZE..],
&erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize],
);
}
}
Loading