Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 103 additions & 133 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5339,7 +5339,7 @@ pub mod tests {
crate::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule::{FixedSchedule, IdentityKeyedLeaderSchedule},
shred::{max_ticks_per_n_shreds, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
shred::{max_ticks_per_n_shreds, ShredFlags},
},
assert_matches::assert_matches,
bincode::{serialize, Options},
Expand All @@ -5366,7 +5366,7 @@ pub mod tests {
solana_transaction_status::{
InnerInstruction, InnerInstructions, Reward, Rewards, TransactionTokenBalance,
},
std::{cmp::Ordering, thread::Builder, time::Duration},
std::{cmp::Ordering, time::Duration},
test_case::test_case,
};

Expand Down Expand Up @@ -10773,38 +10773,74 @@ pub mod tests {
assert!(!blockstore.is_dead(0));
}

/// Prepare two FEC sets of shreds for the same slot index
/// with reasonable shred indices, but in such a way that
/// both FEC sets include a shred with LAST_IN_SLOT flag set.
#[allow(clippy::type_complexity)]
fn setup_duplicate_last_in_slot(
slot: Slot,
) -> ((Vec<Shred>, Vec<Shred>), (Vec<Shred>, Vec<Shred>)) {
let entries = make_slot_entries_with_transactions(1);
let leader_keypair = Arc::new(Keypair::new());
let reed_solomon_cache = ReedSolomonCache::default();
let shredder = Shredder::new(slot, 0, 0, 0).unwrap();
let (shreds1, code1): (Vec<Shred>, Vec<Shred>) = shredder
.make_merkle_shreds_from_entries(
&leader_keypair,
&entries,
true, // is_last_in_slot
Some(Hash::new_unique()),
0, // next_shred_index
0, // next_code_index,
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
)
.partition(Shred::is_data);
let last_data1 = shreds1.last().unwrap();
let last_code1 = code1.last().unwrap();

let (shreds2, code2) = shredder
.make_merkle_shreds_from_entries(
&leader_keypair,
&entries,
true, // is_last_in_slot
Some(last_data1.chained_merkle_root().unwrap()),
last_data1.index() + 1, // next_shred_index
last_code1.index() + 1, // next_code_index,
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
)
.partition(Shred::is_data);
((shreds1, code1), (shreds2, code2))
}

#[test]
fn test_duplicate_last_index() {
let num_shreds = 2;
let num_entries = max_ticks_per_n_shreds(num_shreds, None);
let slot = 1;
let (mut shreds, _) =
make_slot_entries(slot, 0, num_entries, /*merkle_variant:*/ false);
let ((shreds1, _code1), (shreds2, _code2)) = setup_duplicate_last_in_slot(slot);

// Mark both as last shred
shreds[0].set_last_in_slot();
shreds[1].set_last_in_slot();
let last_data1 = shreds1.last().unwrap();
let last_data2 = shreds2.last().unwrap();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore
.insert_shreds(vec![last_data1.clone(), last_data2.clone()], None, false)
.unwrap();

assert!(blockstore.get_duplicate_slot(slot).is_some());
}

#[test]
fn test_duplicate_last_index_mark_dead() {
let num_shreds = 10;
let smaller_last_shred_index = 5;
let smaller_last_shred_index = 31;
let larger_last_shred_index = 8;

let setup_test_shreds = |slot: Slot| -> Vec<Shred> {
let num_entries = max_ticks_per_n_shreds(num_shreds, Some(LEGACY_SHRED_DATA_CAPACITY));
let (mut shreds, _) =
make_slot_entries(slot, 0, num_entries, /*merkle_variant:*/ false);
shreds[smaller_last_shred_index].set_last_in_slot();
shreds[larger_last_shred_index].set_last_in_slot();
shreds
let ((mut shreds1, _code1), (mut shreds2, _code2)) = setup_duplicate_last_in_slot(slot);
shreds1.append(&mut shreds2);
shreds1
};

let get_expected_slot_meta_and_index_meta =
Expand Down Expand Up @@ -10863,38 +10899,6 @@ pub mod tests {
assert_eq!(meta, expected_slot_meta);
assert_eq!(blockstore.get_index(slot).unwrap().unwrap(), expected_index);

// Case 2: Inserting a duplicate with an even smaller last shred index should not
// mark the slot as dead since the Slotmeta is full.
let even_smaller_last_shred_duplicate = {
let mut payload = shreds[smaller_last_shred_index - 1].payload().clone();
// Flip a byte to create a duplicate shred
payload[0] = u8::MAX - payload[0];
let mut shred = Shred::new_from_serialized_shred(payload).unwrap();
shred.set_last_in_slot();
shred
};
assert!(blockstore
.is_shred_duplicate(&even_smaller_last_shred_duplicate)
.is_some());
blockstore
.insert_shreds(vec![even_smaller_last_shred_duplicate], None, false)
.unwrap();
assert!(!blockstore.is_dead(slot));
for i in 0..num_shreds {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
shreds[i as usize].payload().as_ref(),
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
}
}
let mut meta = blockstore.meta(slot).unwrap().unwrap();
meta.first_shred_timestamp = expected_slot_meta.first_shred_timestamp;
assert_eq!(meta, expected_slot_meta);
assert_eq!(blockstore.get_index(slot).unwrap().unwrap(), expected_index);

// Case 3: Insert shreds in reverse so that consumed will not be updated. Now on insert, the
// the slot should be marked as dead
slot += 1;
Expand Down Expand Up @@ -10964,104 +10968,70 @@ pub mod tests {

#[test]
fn test_get_slot_entries_dead_slot_race() {
let setup_test_shreds = move |slot: Slot| -> Vec<Shred> {
let num_shreds = 10;
let middle_shred_index = 5;
let num_entries = max_ticks_per_n_shreds(num_shreds, None);
let (shreds, _) =
make_slot_entries(slot, 0, num_entries, /*merkle_variant:*/ false);

// Reverse shreds so that last shred gets inserted first and sets meta.received
let mut shreds: Vec<Shred> = shreds.into_iter().rev().collect();

// Push the real middle shred to the end of the shreds list
shreds.push(shreds[middle_shred_index].clone());

// Set the middle shred as a last shred to cause the slot to be marked dead
shreds[middle_shred_index].set_last_in_slot();
shreds
};

let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let (slot_sender, slot_receiver) = unbounded();
let (shred_sender, shred_receiver) = unbounded::<Vec<Shred>>();
let (signal_sender, signal_receiver) = unbounded();

let t_entry_getter = {
let blockstore = blockstore.clone();
let signal_sender = signal_sender.clone();
Builder::new()
.spawn(move || {
while let Ok(slot) = slot_receiver.recv() {
match blockstore.get_slot_entries_with_shred_info(slot, 0, false) {
Ok((_entries, _num_shreds, is_full)) => {
if is_full {
signal_sender
.send(Err(IoError::other(
"got full slot entries for dead slot",
)))
.unwrap();
}
}
Err(err) => {
assert_matches!(err, BlockstoreError::DeadSlot);
std::thread::scope(|scope| {
scope.spawn(|| {
while let Ok(slot) = slot_receiver.recv() {
match blockstore.get_slot_entries_with_shred_info(slot, 0, false) {
Ok((_entries, _num_shreds, is_full)) => {
if is_full {
signal_sender
.send(Err(IoError::other(
"got full slot entries for dead slot",
)))
.unwrap();
}
}
signal_sender.send(Ok(())).unwrap();
}
})
.unwrap()
};

let t_shred_inserter = {
let blockstore = blockstore.clone();
Builder::new()
.spawn(move || {
while let Ok(shreds) = shred_receiver.recv() {
let slot = shreds[0].slot();
// Grab this lock to block `get_slot_entries` before it fetches completed datasets
// and then mark the slot as dead, but full, by inserting carefully crafted shreds.

#[allow(clippy::readonly_write_lock)]
// Possible clippy bug, the lock is unused so clippy shouldn't care
// about read vs. write lock
let _lowest_cleanup_slot =
blockstore.lowest_cleanup_slot.write().unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(blockstore.get_duplicate_slot(slot).is_some());
assert!(blockstore.is_dead(slot));
assert!(blockstore.meta(slot).unwrap().unwrap().is_full());
signal_sender.send(Ok(())).unwrap();
Err(err) => {
assert_matches!(err, BlockstoreError::DeadSlot);
}
}
})
.unwrap()
};

for slot in 0..100 {
let shreds = setup_test_shreds(slot);
signal_sender.send(Ok(())).unwrap();
}
});

// Start a task on each thread to trigger a race condition
slot_sender.send(slot).unwrap();
shred_sender.send(shreds).unwrap();
scope.spawn(|| {
while let Ok(shreds) = shred_receiver.recv() {
let slot = shreds[0].slot();
// Grab this lock to block `get_slot_entries` before it fetches completed datasets
// and then mark the slot as dead, but full, by inserting carefully crafted shreds.

#[allow(clippy::readonly_write_lock)]
// Possible clippy bug, the lock is unused so clippy shouldn't care
// about read vs. write lock
let _lowest_cleanup_slot = blockstore.lowest_cleanup_slot.write().unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(blockstore.get_duplicate_slot(slot).is_some());
assert!(blockstore.is_dead(slot));
signal_sender.send(Ok(())).unwrap();
}
});

// Check that each thread processed their task before continuing
for _ in 1..=2 {
let res = signal_receiver.recv().unwrap();
assert!(res.is_ok(), "race condition: {res:?}");
for slot in 0..100 {
let ((mut shreds1, _), (mut shreds2, _)) = setup_duplicate_last_in_slot(slot);
// compose shreds in reverse order of FEC sets to
// make sure slot is marked dead
shreds2.append(&mut shreds1);
// Start a task on each thread to trigger a race condition
slot_sender.send(slot).unwrap();
shred_sender.send(shreds2).unwrap();

// Check that each thread processed their task before continuing
for _ in 1..=2 {
let res = signal_receiver.recv().unwrap();
assert!(res.is_ok(), "race condition: {res:?}");
}
}
}

drop(slot_sender);
drop(shred_sender);

let handles = vec![t_entry_getter, t_shred_inserter];
for handle in handles {
assert!(handle.join().is_ok());
}

assert!(Arc::strong_count(&blockstore) == 1);
drop(slot_sender);
drop(shred_sender);
});
}
}

Expand Down
Loading