Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,14 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
);
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
let data_shreds = shredder
.entries_to_data_shreds(
&Keypair::new(),
&entries,
true, // is_last_in_slot
0, // next_shred_index
0, // fec_set_offset
&mut ProcessShredsStats::default(),
)
.0;
let data_shreds = shredder.entries_to_data_shreds(
&Keypair::new(),
&entries,
true, // is_last_in_slot
0, // next_shred_index
0, // fec_set_offset
&mut ProcessShredsStats::default(),
);
assert!(data_shreds.len() >= num_shreds);
data_shreds
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
)
.expect("Expected to create a new shredder");

let (data_shreds, _, _) = shredder.entries_to_shreds(
let (data_shreds, _) = shredder.entries_to_shreds(
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
Expand All @@ -163,10 +163,10 @@ impl BroadcastRun for BroadcastDuplicatesRun {

self.next_shred_index += data_shreds.len() as u32;
let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| {
let (original_last_data_shred, _, _) =
let (original_last_data_shred, _) =
shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index);

let (partition_last_data_shred, _, _) =
let (partition_last_data_shred, _) =
// Don't mark the last shred as last so that validators won't know that
// they've gotten all the shreds, and will continue trying to repair
shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index);
Expand Down
4 changes: 2 additions & 2 deletions core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
)
.expect("Expected to create a new shredder");

let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height(),
Expand All @@ -69,7 +69,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
.map(|_| Entry::new(&self.last_blockhash, 0, vec![]))
.collect();

let (fake_data_shreds, fake_coding_shreds, _) = shredder.entries_to_shreds(
let (fake_data_shreds, fake_coding_shreds) = shredder.entries_to_shreds(
keypair,
&fake_entries,
last_tick_height == bank.max_tick_height(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
)
.expect("Expected to create a new shredder");

let (data_shreds, _, _) = shredder.entries_to_shreds(
let (data_shreds, _) = shredder.entries_to_shreds(
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
Expand All @@ -92,10 +92,10 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {

self.next_shred_index += data_shreds.len() as u32;
let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| {
let (good_last_data_shred, _, _) =
let (good_last_data_shred, _) =
shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index);

let (bad_last_data_shred, _, _) =
let (bad_last_data_shred, _) =
// Don't mark the last shred as last so that validators won't know that
// they've gotten all the shreds, and will continue trying to repair
shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index);
Expand Down
25 changes: 14 additions & 11 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,16 @@ impl StandardBroadcastRun {
None => (0, 0),
},
};
let (data_shreds, next_shred_index) =
Shredder::new(slot, parent_slot, reference_tick, self.shred_version)
.unwrap()
.entries_to_data_shreds(
keypair,
entries,
is_slot_end,
next_shred_index,
fec_set_offset,
process_stats,
);
let data_shreds = Shredder::new(slot, parent_slot, reference_tick, self.shred_version)
.unwrap()
.entries_to_data_shreds(
keypair,
entries,
is_slot_end,
next_shred_index,
fec_set_offset,
process_stats,
);
let mut data_shreds_buffer = match &mut self.unfinished_slot {
Some(state) => {
assert_eq!(state.slot, slot);
Expand All @@ -138,6 +137,10 @@ impl StandardBroadcastRun {
None => Vec::default(),
};
data_shreds_buffer.extend(data_shreds.clone());
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Some(index) => index + 1,
None => next_shred_index,
};
self.unfinished_slot = Some(UnfinishedSlotInfo {
next_shred_index,
slot,
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ pub(crate) mod tests {
})
.take(5)
.collect();
let (mut data_shreds, _coding_shreds, _last_shred_index) = shredder.entries_to_shreds(
let (mut data_shreds, _coding_shreds) = shredder.entries_to_shreds(
keypair,
&entries,
true, // is_last_in_slot
Expand Down
11 changes: 5 additions & 6 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,7 @@ impl Blockstore {
0
}
};
let (mut data_shreds, mut coding_shreds, _) =
let (mut data_shreds, mut coding_shreds) =
shredder.entries_to_shreds(keypair, &current_entries, true, start_index);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
Expand All @@ -1716,7 +1716,7 @@ impl Blockstore {
}

if !slot_entries.is_empty() {
let (mut data_shreds, mut coding_shreds, _) =
let (mut data_shreds, mut coding_shreds) =
shredder.entries_to_shreds(keypair, &slot_entries, is_full_slot, 0);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
Expand Down Expand Up @@ -8104,7 +8104,7 @@ pub mod tests {
let entries = make_slot_entries_with_transactions(num_entries);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap();
let (data_shreds, coding_shreds, _) =
let (data_shreds, coding_shreds) =
shredder.entries_to_shreds(&leader_keypair, &entries, true, 0);

let genesis_config = create_genesis_config(2).genesis_config;
Expand Down Expand Up @@ -8160,9 +8160,8 @@ pub mod tests {
let entries2 = make_slot_entries_with_transactions(1);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, 0, 0, 0).unwrap();
let (shreds, _, _) = shredder.entries_to_shreds(&leader_keypair, &entries1, true, 0);
let (duplicate_shreds, _, _) =
shredder.entries_to_shreds(&leader_keypair, &entries2, true, 0);
let (shreds, _) = shredder.entries_to_shreds(&leader_keypair, &entries1, true, 0);
let (duplicate_shreds, _) = shredder.entries_to_shreds(&leader_keypair, &entries2, true, 0);
let shred = shreds[0].clone();
let duplicate_shred = duplicate_shreds[0].clone();
let non_duplicate_shred = shred.clone();
Expand Down
33 changes: 17 additions & 16 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,9 +759,12 @@ impl Shredder {
entries: &[Entry],
is_last_in_slot: bool,
next_shred_index: u32,
) -> (Vec<Shred>, Vec<Shred>, u32) {
) -> (
Vec<Shred>, // data shreds
Vec<Shred>, // coding shreds
) {
let mut stats = ProcessShredsStats::default();
let (data_shreds, last_shred_index) = self.entries_to_data_shreds(
let data_shreds = self.entries_to_data_shreds(
keypair,
entries,
is_last_in_slot,
Expand All @@ -772,7 +775,7 @@ impl Shredder {
let coding_shreds =
Self::data_shreds_to_coding_shreds(keypair, &data_shreds, is_last_in_slot, &mut stats)
.unwrap();
(data_shreds, coding_shreds, last_shred_index)
(data_shreds, coding_shreds)
}

// Each FEC block has maximum MAX_DATA_SHREDS_PER_FEC_BLOCK shreds.
Expand All @@ -794,7 +797,7 @@ impl Shredder {
// Shred index offset at which FEC sets are generated.
fec_set_offset: u32,
process_stats: &mut ProcessShredsStats,
) -> (Vec<Shred>, u32) {
) -> Vec<Shred> {
let mut serialize_time = Measure::start("shred_serialize");
let serialized_shreds =
bincode::serialize(entries).expect("Expect to serialize all entries");
Expand Down Expand Up @@ -842,7 +845,7 @@ impl Shredder {
process_stats.serialize_elapsed += serialize_time.as_us();
process_stats.gen_data_elapsed += gen_data_time.as_us();

(data_shreds, last_shred_index + 1)
data_shreds
}

pub fn data_shreds_to_coding_shreds(
Expand Down Expand Up @@ -1305,8 +1308,9 @@ pub mod tests {
.saturating_sub(num_expected_data_shreds as usize)
.max(num_expected_data_shreds as usize);
let start_index = 0;
let (data_shreds, coding_shreds, next_index) =
let (data_shreds, coding_shreds) =
shredder.entries_to_shreds(&keypair, &entries, true, start_index);
let next_index = data_shreds.last().unwrap().index() + 1;
assert_eq!(next_index as u64, num_expected_data_shreds);

let mut data_shred_indexes = HashSet::new();
Expand Down Expand Up @@ -1458,8 +1462,7 @@ pub mod tests {
})
.collect();

let (data_shreds, coding_shreds, _) =
shredder.entries_to_shreds(&keypair, &entries, true, 0);
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, 0);

for (i, s) in data_shreds.iter().enumerate() {
verify_test_data_shred(
Expand Down Expand Up @@ -1507,7 +1510,7 @@ pub mod tests {
.collect();

let serialized_entries = bincode::serialize(&entries).unwrap();
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&keypair,
&entries,
is_last_in_slot,
Expand Down Expand Up @@ -1638,8 +1641,7 @@ pub mod tests {
// Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds
// and 2 missing coding shreds. Hint: should work
let serialized_entries = bincode::serialize(&entries).unwrap();
let (data_shreds, coding_shreds, _) =
shredder.entries_to_shreds(&keypair, &entries, true, 25);
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, 25);
// We should have 10 shreds now
assert_eq!(data_shreds.len(), num_data_shreds);

Expand Down Expand Up @@ -1723,7 +1725,7 @@ pub mod tests {
)
.unwrap();
let next_shred_index = rng.gen_range(1, 1024);
let (data_shreds, coding_shreds, _) =
let (data_shreds, coding_shreds) =
shredder.entries_to_shreds(&keypair, &[entry], is_last_in_slot, next_shred_index);
let num_data_shreds = data_shreds.len();
let mut shreds = coding_shreds;
Expand Down Expand Up @@ -1777,8 +1779,7 @@ pub mod tests {
})
.collect();

let (data_shreds, coding_shreds, _next_index) =
shredder.entries_to_shreds(&keypair, &entries, true, 0);
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, 0);
assert!(!data_shreds
.iter()
.chain(coding_shreds.iter())
Expand Down Expand Up @@ -1826,7 +1827,7 @@ pub mod tests {
.collect();

let start_index = 0x12;
let (data_shreds, coding_shreds, _next_index) =
let (data_shreds, coding_shreds) =
shredder.entries_to_shreds(&keypair, &entries, true, start_index);

let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
Expand Down Expand Up @@ -1863,7 +1864,7 @@ pub mod tests {

let mut stats = ProcessShredsStats::default();
let start_index = 0x12;
let (data_shreds, _next_index) = shredder.entries_to_data_shreds(
let data_shreds = shredder.entries_to_data_shreds(
&keypair,
&entries,
true, // is_last_in_slot
Expand Down
8 changes: 4 additions & 4 deletions ledger/tests/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ fn test_multi_fec_block_coding() {
.collect();

let serialized_entries = bincode::serialize(&entries).unwrap();
let (data_shreds, coding_shreds, next_index) =
shredder.entries_to_shreds(&keypair, &entries, true, 0);
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&keypair, &entries, true, 0);
let next_index = data_shreds.last().unwrap().index() + 1;
assert_eq!(next_index as usize, num_data_shreds);
assert_eq!(data_shreds.len(), num_data_shreds);
assert_eq!(coding_shreds.len(), num_data_shreds);
Expand Down Expand Up @@ -218,7 +218,7 @@ fn setup_different_sized_fec_blocks(
let total_num_data_shreds: usize = 2 * num_shreds_per_iter;
for i in 0..2 {
let is_last = i == 1;
let (data_shreds, coding_shreds, new_next_index) =
let (data_shreds, coding_shreds) =
shredder.entries_to_shreds(&keypair, &entries, is_last, next_index);
for shred in &data_shreds {
if (shred.index() as usize) == total_num_data_shreds - 1 {
Expand All @@ -232,7 +232,7 @@ fn setup_different_sized_fec_blocks(
}
}
assert_eq!(data_shreds.len(), num_shreds_per_iter as usize);
next_index = new_next_index;
next_index = data_shreds.last().unwrap().index() + 1;
sort_data_coding_into_fec_sets(
data_shreds,
coding_shreds,
Expand Down