Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let slot = 0;
let parent = 0;
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
let mut data_shreds = shredder.entries_to_shreds(&keypair, &entries, true, 0).0;
let (mut data_shreds, _) = shredder.entries_to_shreds(
&keypair, &entries, true, // is_last_in_slot
0, // next_shred_index
0, // next_code_index
);

let num_packets = data_shreds.len();

Expand Down
8 changes: 5 additions & 3 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
let entries = create_ticks(num_ticks, 0, Hash::default());
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
shredder.entries_to_shreds(&kp, &entries, true, 0);
shredder.entries_to_shreds(&kp, &entries, true, 0, 0);
})
}

Expand All @@ -93,7 +93,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
// 1Mb
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
shredder.entries_to_shreds(&kp, &entries, true, 0);
shredder.entries_to_shreds(&kp, &entries, true, 0, 0);
})
}

Expand All @@ -106,7 +106,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
let data_shreds = shredder.entries_to_shreds(&kp, &entries, true, 0).0;
let (data_shreds, _) = shredder.entries_to_shreds(&kp, &entries, true, 0, 0);
bencher.iter(|| {
let raw = &mut Shredder::deshred(&data_shreds).unwrap();
assert_ne!(raw.len(), 0);
Expand All @@ -133,6 +133,7 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
true, // is_last_in_slot
0, // next_code_index
)
.len();
})
Expand All @@ -145,6 +146,7 @@ fn bench_shredder_decoding(bencher: &mut Bencher) {
let coding_shreds = Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
true, // is_last_in_slot
0, // next_code_index
);
bencher.iter(|| {
Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap();
Expand Down
1 change: 1 addition & 0 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ pub mod test {
&keypair,
&data_shreds[0..],
true, // is_last_in_slot
0, // next_code_index
&mut ProcessShredsStats::default(),
)
.unwrap();
Expand Down
13 changes: 10 additions & 3 deletions core/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(super) struct BroadcastDuplicatesRun {
config: BroadcastDuplicatesConfig,
current_slot: Slot,
next_shred_index: u32,
next_code_index: u32,
shred_version: u16,
recent_blockhash: Option<Hash>,
prev_entry_hash: Option<Hash>,
Expand All @@ -46,6 +47,7 @@ impl BroadcastDuplicatesRun {
Self {
config,
next_shred_index: u32::MAX,
next_code_index: 0,
shred_version,
current_slot: 0,
recent_blockhash: None,
Expand Down Expand Up @@ -74,6 +76,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {

if bank.slot() != self.current_slot {
self.next_shred_index = 0;
self.next_code_index = 0;
self.current_slot = bank.slot();
self.prev_entry_hash = None;
self.num_slots_broadcasted += 1;
Expand Down Expand Up @@ -154,22 +157,26 @@ impl BroadcastRun for BroadcastDuplicatesRun {
)
.expect("Expected to create a new shredder");

let (data_shreds, _) = shredder.entries_to_shreds(
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
self.next_shred_index,
self.next_code_index,
);

self.next_shred_index += data_shreds.len() as u32;
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
self.next_code_index = index + 1;
}
let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| {
let (original_last_data_shred, _) =
shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index);
shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index, self.next_code_index);

let (partition_last_data_shred, _) =
// Don't mark the last shred as last so that validators won't know that
// they've gotten all the shreds, and will continue trying to repair
shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index);
shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index, self.next_code_index);

let sigs: Vec<_> = partition_last_data_shred.iter().map(|s| (s.signature(), s.index())).collect();
info!(
Expand Down
13 changes: 13 additions & 0 deletions core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(super) struct BroadcastFakeShredsRun {
last_blockhash: Hash,
partition: usize,
shred_version: u16,
next_code_index: u32,
}

impl BroadcastFakeShredsRun {
Expand All @@ -18,6 +19,7 @@ impl BroadcastFakeShredsRun {
last_blockhash: Hash::default(),
partition,
shred_version,
next_code_index: 0,
}
}
}
Expand Down Expand Up @@ -57,6 +59,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
&receive_results.entries,
last_tick_height == bank.max_tick_height(),
next_shred_index,
self.next_code_index,
);

// If the last blockhash is default, a new block is being created
Expand All @@ -74,8 +77,18 @@ impl BroadcastRun for BroadcastFakeShredsRun {
&fake_entries,
last_tick_height == bank.max_tick_height(),
next_shred_index,
self.next_code_index,
);

if let Some(index) = coding_shreds
.iter()
.chain(&fake_coding_shreds)
.map(Shred::index)
.max()
{
self.next_code_index = index + 1;
}

// If it's the last tick, reset the last block hash to default
// this will cause next run to grab last bank's blockhash
if last_tick_height == bank.max_tick_height() {
Expand Down
1 change: 1 addition & 0 deletions core/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(super) struct ReceiveResults {
#[derive(Clone)]
pub struct UnfinishedSlotInfo {
pub next_shred_index: u32,
pub(crate) next_code_index: u32,
pub slot: Slot,
pub parent: Slot,
// Data shreds buffered to make a batch of size
Expand Down
13 changes: 10 additions & 3 deletions core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(super) struct FailEntryVerificationBroadcastRun {
good_shreds: Vec<Shred>,
current_slot: Slot,
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
}

Expand All @@ -29,6 +30,7 @@ impl FailEntryVerificationBroadcastRun {
good_shreds: vec![],
current_slot: 0,
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
}
}
Expand All @@ -50,6 +52,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {

if bank.slot() != self.current_slot {
self.next_shred_index = 0;
self.next_code_index = 0;
self.current_slot = bank.slot();
}

Expand Down Expand Up @@ -83,22 +86,26 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
)
.expect("Expected to create a new shredder");

let (data_shreds, _) = shredder.entries_to_shreds(
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
self.next_shred_index,
self.next_code_index,
);

self.next_shred_index += data_shreds.len() as u32;
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
self.next_code_index = index + 1;
}
let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| {
let (good_last_data_shred, _) =
shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index);
shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index, self.next_code_index);

let (bad_last_data_shred, _) =
// Don't mark the last shred as last so that validators won't know that
// they've gotten all the shreds, and will continue trying to repair
shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index);
shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index, self.next_code_index);

self.next_shred_index += 1;
(good_last_data_shred, bad_last_data_shred)
Expand Down
55 changes: 39 additions & 16 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,13 @@ impl StandardBroadcastRun {
Some(index) => index + 1,
None => next_shred_index,
};
let next_code_index = match &self.unfinished_slot {
Some(state) => state.next_code_index,
None => 0,
};
self.unfinished_slot = Some(UnfinishedSlotInfo {
next_shred_index,
next_code_index,
slot,
parent: parent_slot,
data_shreds_buffer,
Expand Down Expand Up @@ -449,23 +454,40 @@ fn make_coding_shreds(
is_slot_end: bool,
stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
let data_shreds = match unfinished_slot {
None => Vec::default(),
Some(unfinished_slot) => {
let size = unfinished_slot.data_shreds_buffer.len();
// Consume a multiple of 32, unless this is the slot end.
let offset = if is_slot_end {
0
} else {
size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
};
unfinished_slot
.data_shreds_buffer
.drain(0..size - offset)
.collect()
}
let unfinished_slot = match unfinished_slot {
None => return Vec::default(),
Some(state) => state,
};
Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, is_slot_end, stats).unwrap()
let data_shreds: Vec<_> = {
let size = unfinished_slot.data_shreds_buffer.len();
// Consume a multiple of 32, unless this is the slot end.
let offset = if is_slot_end {
0
} else {
size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
};
unfinished_slot
.data_shreds_buffer
.drain(0..size - offset)
.collect()
};
let shreds = Shredder::data_shreds_to_coding_shreds(
keypair,
&data_shreds,
is_slot_end,
unfinished_slot.next_code_index,
stats,
)
.unwrap();
if let Some(index) = shreds
.iter()
.filter(|shred| shred.is_code())
.map(Shred::index)
.max()
{
unfinished_slot.next_code_index = unfinished_slot.next_code_index.max(index + 1);
}
shreds
}

impl BroadcastRun for StandardBroadcastRun {
Expand Down Expand Up @@ -582,6 +604,7 @@ mod test {
let parent = 0;
run.unfinished_slot = Some(UnfinishedSlotInfo {
next_shred_index,
next_code_index: 17,
slot,
parent,
data_shreds_buffer: Vec::default(),
Expand Down
1 change: 1 addition & 0 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ mod tests {
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
false, // is_last_in_slot
3, // next_code_index
);
coding[0].copy_to_packet(&mut packet);
ShredFetchStage::process_packet(
Expand Down
7 changes: 6 additions & 1 deletion core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,12 @@ mod test {
keypair: &Keypair,
) -> Vec<Shred> {
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
shredder.entries_to_shreds(keypair, entries, true, 0).0
let (data_shreds, _) = shredder.entries_to_shreds(
keypair, entries, true, // is_last_in_slot
0, // next_shred_index
0, // next_code_index
);
data_shreds
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ pub(crate) mod tests {
&entries,
true, // is_last_in_slot
next_shred_index,
next_shred_index, // next_code_index
);
data_shreds.swap_remove(0)
}
Expand Down
Loading