From 9a936e2097a6cde62cabbbaa848dc08c71621a00 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 19 Dec 2021 22:37:55 +0000 Subject: [PATCH 1/2] tracks erasure coding shreds' indices explicitly (#21822) The indices for erasure coding shreds are tied to data shreds: https://github.com/solana-labs/solana/blob/90f41fd9b/ledger/src/shred.rs#L921 However with the upcoming changes to erasure schema, there will be more erasure coding shreds than data shreds and we can no longer infer coding shreds indices from data shreds. The commit adds constructs to track coding shreds indices explicitly. (cherry picked from commit 65d59f4ef025410120e50129a4e97df2b6774f40) # Conflicts: # core/benches/retransmit_stage.rs # core/benches/shredder.rs # core/src/broadcast_stage/broadcast_duplicates_run.rs # core/src/broadcast_stage/broadcast_fake_shreds_run.rs # core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs # core/src/window_service.rs # ledger/src/blockstore.rs # ledger/src/shred.rs # ledger/tests/shred.rs --- core/benches/retransmit_stage.rs | 9 ++ core/benches/shredder.rs | 17 +++ core/src/broadcast_stage.rs | 1 + .../broadcast_duplicates_run.rs | 33 +++++ .../broadcast_fake_shreds_run.rs | 19 +++ core/src/broadcast_stage/broadcast_utils.rs | 1 + .../fail_entry_verification_broadcast_run.rs | 20 +++ .../broadcast_stage/standard_broadcast_run.rs | 55 +++++--- core/src/shred_fetch_stage.rs | 1 + core/src/window_service.rs | 10 ++ gossip/src/duplicate_shred.rs | 1 + ledger/src/blockstore.rs | 82 ++++++++++- ledger/src/shred.rs | 128 +++++++++++++++++- ledger/tests/shred.rs | 15 ++ 14 files changed, 370 insertions(+), 22 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index fb20b39e7cd39f..829676af692019 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -99,8 +99,17 @@ fn bench_retransmitter(bencher: &mut Bencher) { let keypair = Arc::new(Keypair::new()); let slot = 0; let parent = 0; +<<<<<<< HEAD let shredder = Shredder::new(slot, parent, keypair, 0, 0).unwrap(); let mut data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; +======= + let shredder = Shredder::new(slot, parent, 0, 0).unwrap(); + let (mut data_shreds, _) = shredder.entries_to_shreds( + &keypair, &entries, true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let num_packets = data_shreds.len(); diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index fff28699e0f6bf..7ec1c390ee038d 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -75,8 +75,13 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); bencher.iter(|| { +<<<<<<< HEAD let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap(); shredder.entries_to_shreds(&entries, true, 0); +======= + let shredder = Shredder::new(1, 0, 0, 0).unwrap(); + shredder.entries_to_shreds(&kp, &entries, true, 0, 0); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) }) } @@ -94,8 +99,13 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { let entries = make_large_unchained_entries(txs_per_entry, num_entries); // 1Mb bencher.iter(|| { +<<<<<<< HEAD let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap(); shredder.entries_to_shreds(&entries, true, 0); +======= + let shredder = Shredder::new(1, 0, 0, 0).unwrap(); + shredder.entries_to_shreds(&kp, &entries, true, 0, 0); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) }) } @@ -107,8 +117,13 @@ fn bench_deshredder(bencher: &mut Bencher) { let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); +<<<<<<< HEAD let shredder = Shredder::new(1, 0, kp, 0, 0).unwrap(); let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; +======= + let shredder = Shredder::new(1, 0, 0, 0).unwrap(); + let (data_shreds, _) = shredder.entries_to_shreds(&kp, &entries, true, 0, 0); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) bencher.iter(|| { let raw = &mut Shredder::deshred(&data_shreds).unwrap(); assert_ne!(raw.len(), 0); @@ -135,6 +150,7 @@ fn bench_shredder_coding(bencher: &mut Bencher) { Shredder::generate_coding_shreds( &data_shreds[..symbol_count], true, // is_last_in_slot + 0, // next_code_index ) .len(); }) @@ -147,6 +163,7 @@ fn bench_shredder_decoding(bencher: &mut Bencher) { let coding_shreds = Shredder::generate_coding_shreds( &data_shreds[..symbol_count], true, // is_last_in_slot + 0, // next_code_index ); bencher.iter(|| { Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap(); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 2cd66d8bcce0cc..5eca71a176c884 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -492,6 +492,7 @@ pub mod test { &keypair, &data_shreds[0..], true, // is_last_in_slot + 0, // next_code_index &mut ProcessShredsStats::default(), ) .unwrap(); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 832d2bb0d614d5..88d7352e59cfdb 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -30,6 +30,7 @@ pub(super) struct BroadcastDuplicatesRun { last_duplicate_entry_hash: Hash, last_broadcast_slot: Slot, next_shred_index: u32, + next_code_index: u32, shred_version: u16, keypair: Arc, cluster_nodes_cache: Arc>, @@ -53,8 +54,12 @@ impl BroadcastDuplicatesRun { duplicate_queue: BlockhashQueue::default(), duplicate_entries_buffer: vec![], next_shred_index: u32::MAX, +<<<<<<< HEAD last_broadcast_slot: 0, last_duplicate_entry_hash: Hash::default(), +======= + next_code_index: 0, +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) shred_version, keypair, cluster_nodes_cache, @@ -158,12 +163,21 @@ impl BroadcastRun for BroadcastDuplicatesRun { let bank = receive_results.bank.clone(); let last_tick_height = receive_results.last_tick_height; +<<<<<<< HEAD if self.next_shred_index == u32::MAX { self.next_shred_index = blockstore .meta(bank.slot()) .expect("Database error") .map(|meta| meta.consumed) .unwrap_or(0) as u32 +======= + if bank.slot() != self.current_slot { + self.next_shred_index = 0; + self.next_code_index = 0; + self.current_slot = bank.slot(); + self.prev_entry_hash = None; + self.num_slots_broadcasted += 1; +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) } // We were not the leader, but just became leader again @@ -182,12 +196,18 @@ impl BroadcastRun for BroadcastDuplicatesRun { .expect("Expected to create a new shredder"); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( +<<<<<<< HEAD +======= + keypair, +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) &receive_results.entries, last_tick_height == bank.max_tick_height(), self.next_shred_index, + self.next_code_index, ); self.next_shred_index += data_shreds.len() as u32; +<<<<<<< HEAD let (duplicate_entries, next_duplicate_shred_index) = self.queue_or_create_duplicate_entries(&bank, &receive_results); let (duplicate_data_shreds, duplicate_coding_shreds) = if !duplicate_entries.is_empty() { @@ -206,6 +226,19 @@ impl BroadcastRun for BroadcastDuplicatesRun { self.duplicate_queue .register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default()); } +======= + if let Some(index) = coding_shreds.iter().map(Shred::index).max() { + self.next_code_index = index + 1; + } + let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| { + let (original_last_data_shred, _) = + shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index, self.next_code_index); + + let (partition_last_data_shred, _) = + // Don't mark the last shred as last so that validators won't know that + // they've gotten all the shreds, and will continue trying to repair + shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index, self.next_code_index); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) // Partition network with duplicate and real shreds based on stake let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index a3f515030a5b0c..7dbc47ecc383eb 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -9,7 +9,11 @@ pub(super) struct BroadcastFakeShredsRun { last_blockhash: Hash, partition: usize, shred_version: u16, +<<<<<<< HEAD keypair: Arc, +======= + next_code_index: u32, +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) } impl BroadcastFakeShredsRun { @@ -18,7 +22,11 @@ impl BroadcastFakeShredsRun { last_blockhash: Hash::default(), partition, shred_version, +<<<<<<< HEAD keypair, +======= + next_code_index: 0, +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) } } } @@ -57,6 +65,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { &receive_results.entries, last_tick_height == bank.max_tick_height(), next_shred_index, + self.next_code_index, ); // If the last blockhash is default, a new block is being created @@ -73,8 +82,18 @@ impl BroadcastRun for BroadcastFakeShredsRun { &fake_entries, last_tick_height == bank.max_tick_height(), next_shred_index, + self.next_code_index, ); + if let Some(index) = coding_shreds + .iter() + .chain(&fake_coding_shreds) + .map(Shred::index) + .max() + { + self.next_code_index = index + 1; + } + // If it's the last tick, reset the last block hash to default // this will cause next run to grab last bank's blockhash if last_tick_height == bank.max_tick_height() { diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 2e3d832412e376..7a0e396d83cf37 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -20,6 +20,7 @@ pub(super) struct ReceiveResults { #[derive(Clone)] pub struct UnfinishedSlotInfo { pub next_shred_index: u32, + pub(crate) next_code_index: u32, pub slot: Slot, pub parent: Slot, // Data shreds buffered to make a batch of size diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 8915f9a26250ba..2436441171399e 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -16,6 +16,7 @@ pub(super) struct FailEntryVerificationBroadcastRun { good_shreds: Vec, current_slot: Slot, next_shred_index: u32, + next_code_index: u32, cluster_nodes_cache: Arc>, } @@ -31,6 +32,7 @@ impl FailEntryVerificationBroadcastRun { good_shreds: vec![], current_slot: 0, next_shred_index: 0, + next_code_index: 0, cluster_nodes_cache, } } @@ -51,6 +53,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { if bank.slot() != self.current_slot { self.next_shred_index = 0; + self.next_code_index = 0; self.current_slot = bank.slot(); } @@ -85,21 +88,38 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) .expect("Expected to create a new shredder"); +<<<<<<< HEAD let (data_shreds, _) = shredder.entries_to_shreds( +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + keypair, +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) &receive_results.entries, last_tick_height == bank.max_tick_height() && last_entries.is_none(), self.next_shred_index, + self.next_code_index, ); self.next_shred_index += data_shreds.len() as u32; + if let Some(index) = coding_shreds.iter().map(Shred::index).max() { + self.next_code_index = index + 1; + } let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| { let (good_last_data_shred, _) = +<<<<<<< HEAD shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index); +======= + shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index, self.next_code_index); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let (bad_last_data_shred, _) = // Don't mark the last shred as last so that validators won't know that // they've gotten all the shreds, and will continue trying to repair +<<<<<<< HEAD shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index); +======= + shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index, self.next_code_index); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) self.next_shred_index += 1; (good_last_data_shred, bad_last_data_shred) diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index ab5bfc696bdcf9..ad174539d44788 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -150,8 +150,13 @@ impl StandardBroadcastRun { Some(index) => index + 1, None => next_shred_index, }; + let next_code_index = match &self.unfinished_slot { + Some(state) => state.next_code_index, + None => 0, + }; self.unfinished_slot = Some(UnfinishedSlotInfo { next_shred_index, + next_code_index, slot, parent: parent_slot, data_shreds_buffer, @@ -455,23 +460,40 @@ fn make_coding_shreds( is_slot_end: bool, stats: &mut ProcessShredsStats, ) -> Vec { - let data_shreds = match unfinished_slot { - None => Vec::default(), - Some(unfinished_slot) => { - let size = unfinished_slot.data_shreds_buffer.len(); - // Consume a multiple of 32, unless this is the slot end. - let offset = if is_slot_end { - 0 - } else { - size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize - }; - unfinished_slot - .data_shreds_buffer - .drain(0..size - offset) - .collect() - } + let unfinished_slot = match unfinished_slot { + None => return Vec::default(), + Some(state) => state, }; - Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, is_slot_end, stats).unwrap() + let data_shreds: Vec<_> = { + let size = unfinished_slot.data_shreds_buffer.len(); + // Consume a multiple of 32, unless this is the slot end. + let offset = if is_slot_end { + 0 + } else { + size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + }; + unfinished_slot + .data_shreds_buffer + .drain(0..size - offset) + .collect() + }; + let shreds = Shredder::data_shreds_to_coding_shreds( + keypair, + &data_shreds, + is_slot_end, + unfinished_slot.next_code_index, + stats, + ) + .unwrap(); + if let Some(index) = shreds + .iter() + .filter(|shred| shred.is_code()) + .map(Shred::index) + .max() + { + unfinished_slot.next_code_index = unfinished_slot.next_code_index.max(index + 1); + } + shreds } impl BroadcastRun for StandardBroadcastRun { @@ -585,6 +607,7 @@ mod test { let parent = 0; run.unfinished_slot = Some(UnfinishedSlotInfo { next_shred_index, + next_code_index: 17, slot, parent, data_shreds_buffer: Vec::default(), diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 0c8b7a6f3cef78..e2b9895c469fd6 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -270,6 +270,7 @@ mod tests { let coding = solana_ledger::shred::Shredder::generate_coding_shreds( &[shred], false, // is_last_in_slot + 3, // next_code_index ); coding[0].copy_to_packet(&mut packet); ShredFetchStage::process_packet( diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 815d6d41d473e8..2670bfd01b9ee6 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -736,8 +736,18 @@ mod test { parent: Slot, keypair: &Arc, ) -> Vec { +<<<<<<< HEAD let shredder = Shredder::new(slot, parent, keypair.clone(), 0, 0).unwrap(); shredder.entries_to_shreds(entries, true, 0).0 +======= + let shredder = Shredder::new(slot, parent, 0, 0).unwrap(); + let (data_shreds, _) = shredder.entries_to_shreds( + keypair, entries, true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); + data_shreds +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) } #[test] diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 17684f0f9b48d5..d001b01f12dc14 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -338,6 +338,7 @@ pub(crate) mod tests { &entries, true, // is_last_in_slot next_shred_index, + next_shred_index, // next_code_index ); data_shreds.swap_remove(0) } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 42c7dbae9ded84..3959e2aec505a0 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1665,8 +1665,18 @@ impl Blockstore { 0 } }; +<<<<<<< HEAD let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds(¤t_entries, true, start_index); +======= + let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds( + keypair, + ¤t_entries, + true, // is_last_in_slot + start_index, // next_shred_index + start_index, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); shredder = Shredder::new( @@ -1686,8 +1696,18 @@ impl Blockstore { } if !slot_entries.is_empty() { +<<<<<<< HEAD let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds(&slot_entries, is_full_slot, 0); +======= + let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds( + keypair, + &slot_entries, + is_full_slot, + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); } @@ -3558,8 +3578,21 @@ pub fn create_new_ledger( let last_hash = entries.last().unwrap().hash; let version = solana_sdk::shred_version::version_from_hash(&last_hash); +<<<<<<< HEAD let shredder = Shredder::new(0, 0, Arc::new(Keypair::new()), 0, version).unwrap(); let shreds = shredder.entries_to_shreds(&entries, true, 0).0; +======= + let shredder = Shredder::new(0, 0, 0, version).unwrap(); + let shreds = shredder + .entries_to_shreds( + &Keypair::new(), + &entries, + true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ) + .0; +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) assert!(shreds.last().unwrap().last_in_slot()); blockstore.insert_shreds(shreds, None, false)?; @@ -3742,7 +3775,17 @@ pub fn entries_to_test_shreds( ) -> Vec { Shredder::new(slot, parent_slot, Arc::new(Keypair::new()), 0, version) .unwrap() +<<<<<<< HEAD .entries_to_shreds(&entries, is_full_slot, 0) +======= + .entries_to_shreds( + &Keypair::new(), + &entries, + is_full_slot, + 0, // next_shred_index, + 0, // next_code_index + ) +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) .0 } @@ -8103,8 +8146,19 @@ pub mod tests { ) -> (Vec, Vec, Arc) { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); +<<<<<<< HEAD let shredder = Shredder::new(slot, parent_slot, leader_keypair.clone(), 0, 0).unwrap(); let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); +======= + let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &leader_keypair, + &entries, + true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let genesis_config = create_genesis_config(2).genesis_config; let bank = Arc::new(Bank::new(&genesis_config)); @@ -8155,9 +8209,27 @@ pub mod tests { let entries1 = make_slot_entries_with_transactions(1); let entries2 = make_slot_entries_with_transactions(1); let leader_keypair = Arc::new(Keypair::new()); +<<<<<<< HEAD let shredder = Shredder::new(slot, 0, leader_keypair, 0, 0).unwrap(); let (shreds, _) = shredder.entries_to_shreds(&entries1, true, 0); let (duplicate_shreds, _) = shredder.entries_to_shreds(&entries2, true, 0); +======= + let shredder = Shredder::new(slot, 0, 0, 0).unwrap(); + let (shreds, _) = shredder.entries_to_shreds( + &leader_keypair, + &entries1, + true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index, + ); + let (duplicate_shreds, _) = shredder.entries_to_shreds( + &leader_keypair, + &entries2, + true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let shred = shreds[0].clone(); let duplicate_shred = duplicate_shreds[0].clone(); let non_duplicate_shred = shred.clone(); @@ -8472,8 +8544,14 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let ledger = Blockstore::open(&ledger_path).unwrap(); - let coding1 = Shredder::generate_coding_shreds(&shreds, false); - let coding2 = Shredder::generate_coding_shreds(&shreds, true); + let coding1 = Shredder::generate_coding_shreds( + &shreds, false, // is_last_in_slot + 0, // next_code_index + ); + let coding2 = Shredder::generate_coding_shreds( + &shreds, true, // is_last_in_slot + 0, // next_code_index + ); for shred in &shreds { info!("shred {:?}", shred); } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 2ef8bbc6d3ab5b..e59db233309155 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -77,12 +77,16 @@ use { pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }, +<<<<<<< HEAD std::{ convert::{TryFrom, TryInto}, mem::size_of, ops::Deref, sync::Arc, }, +======= + std::{cell::RefCell, convert::TryInto, mem::size_of}, +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) thiserror::Error, }; @@ -775,6 +779,7 @@ impl Shredder { entries: &[Entry], is_last_in_slot: bool, next_shred_index: u32, + next_code_index: u32, ) -> ( Vec, // data shreds Vec, // coding shreds @@ -788,9 +793,16 @@ impl Shredder { &mut stats, ); let coding_shreds = Self::data_shreds_to_coding_shreds( +<<<<<<< HEAD self.keypair.deref(), &data_shreds, is_last_in_slot, +======= + keypair, + &data_shreds, + is_last_in_slot, + next_code_index, +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) &mut stats, ) .unwrap(); @@ -870,6 +882,7 @@ impl Shredder { keypair: &Keypair, data_shreds: &[Shred], is_last_in_slot: bool, + next_code_index: u32, process_stats: &mut ProcessShredsStats, ) -> Result> { if data_shreds.is_empty() { @@ -881,8 +894,26 @@ impl Shredder { thread_pool.borrow().install(|| { data_shreds .par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) - .flat_map(|shred_data_batch| { - Shredder::generate_coding_shreds(shred_data_batch, is_last_in_slot) + .enumerate() + .flat_map(|(i, shred_data_batch)| { + // Assumption here is that, for now, each fec block has + // as many coding shreds as data shreds (except for the + // last one in the slot). + // TODO: tie this more closely with + // generate_coding_shreds. + let next_code_index = next_code_index + .checked_add( + u32::try_from(i) + .unwrap() + .checked_mul(MAX_DATA_SHREDS_PER_FEC_BLOCK) + .unwrap(), + ) + .unwrap(); + Shredder::generate_coding_shreds( + shred_data_batch, + is_last_in_slot, + next_code_index, + ) }) .collect() }) @@ -940,7 +971,11 @@ impl Shredder { } /// Generates coding shreds for the data shreds in the current FEC set - pub fn generate_coding_shreds(data: &[Shred], is_last_in_slot: bool) -> Vec { + pub fn generate_coding_shreds( + data: &[Shred], + is_last_in_slot: bool, + next_code_index: u32, + ) -> Vec { const PAYLOAD_ENCODE_SIZE: usize = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS; let ShredCommonHeader { slot, @@ -976,9 +1011,10 @@ impl Shredder { .iter() .enumerate() .map(|(i, parity)| { + let index = next_code_index + u32::try_from(i).unwrap(); let mut shred = Shred::new_empty_coding( slot, - fec_set_index + i as u32, // shred index + index, fec_set_index, num_data, num_coding, @@ -1326,7 +1362,17 @@ pub mod tests { .saturating_sub(num_expected_data_shreds as usize) .max(num_expected_data_shreds as usize); let start_index = 0; +<<<<<<< HEAD let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, start_index); +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &keypair, + &entries, + true, // is_last_in_slot + start_index, // next_shred_index + start_index, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let next_index = data_shreds.last().unwrap().index() + 1; assert_eq!(next_index as u64, num_expected_data_shreds); @@ -1396,8 +1442,16 @@ pub mod tests { }) .collect(); +<<<<<<< HEAD let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; +======= + let (data_shreds, _) = shredder.entries_to_shreds( + &keypair, &entries, true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let deserialized_shred = Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload.clone()).unwrap(); assert_eq!(deserialized_shred, *data_shreds.last().unwrap()); @@ -1419,7 +1473,15 @@ pub mod tests { }) .collect(); +<<<<<<< HEAD let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; +======= + let (data_shreds, _) = shredder.entries_to_shreds( + &keypair, &entries, true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) data_shreds.iter().for_each(|s| { assert_eq!(s.reference_tick(), 5); assert_eq!(Shred::reference_tick_from_data(&s.payload), 5); @@ -1446,7 +1508,15 @@ pub mod tests { }) .collect(); +<<<<<<< HEAD let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; +======= + let (data_shreds, _) = shredder.entries_to_shreds( + &keypair, &entries, true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) data_shreds.iter().for_each(|s| { assert_eq!(s.reference_tick(), SHRED_TICK_REFERENCE_MASK); assert_eq!( @@ -1479,8 +1549,16 @@ pub mod tests { }) .collect(); +<<<<<<< HEAD let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &keypair, &entries, true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) for (i, s) in data_shreds.iter().enumerate() { verify_test_data_shred( s, @@ -1531,6 +1609,7 @@ pub mod tests { &entries, is_last_in_slot, 0, // next_shred_index + 0, // next_code_index ); let num_coding_shreds = coding_shreds.len(); @@ -1657,7 +1736,15 @@ pub mod tests { // Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds // and 2 missing coding shreds. Hint: should work let serialized_entries = bincode::serialize(&entries).unwrap(); +<<<<<<< HEAD let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 25); +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &keypair, &entries, true, // is_last_in_slot + 25, // next_shred_index, + 25, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) // We should have 10 shreds now assert_eq!(data_shreds.len(), num_data_shreds); @@ -1735,8 +1822,18 @@ pub mod tests { ) .unwrap(); let next_shred_index = rng.gen_range(1, 1024); +<<<<<<< HEAD let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&[entry], is_last_in_slot, next_shred_index); +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &keypair, + &[entry], + is_last_in_slot, + next_shred_index, + next_shred_index, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let num_data_shreds = data_shreds.len(); let mut shreds = coding_shreds; shreds.extend(data_shreds.iter().cloned()); @@ -1789,7 +1886,15 @@ pub mod tests { }) .collect(); +<<<<<<< HEAD let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &keypair, &entries, true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) assert!(!data_shreds .iter() .chain(coding_shreds.iter()) @@ -1837,8 +1942,18 @@ pub mod tests { .collect(); let start_index = 0x12; +<<<<<<< HEAD let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, start_index); +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &keypair, + &entries, + true, // is_last_in_slot + start_index, // next_shred_index + start_index, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; data_shreds.iter().enumerate().for_each(|(i, s)| { let expected_fec_set_index = start_index + ((i / max_per_block) * max_per_block) as u32; @@ -1882,12 +1997,14 @@ pub mod tests { ); assert!(data_shreds.len() > MAX_DATA_SHREDS_PER_FEC_BLOCK as usize); + let next_code_index = data_shreds[0].index(); (1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize).for_each(|count| { let coding_shreds = Shredder::data_shreds_to_coding_shreds( shredder.keypair.deref(), &data_shreds[..count], false, // is_last_in_slot + next_code_index, &mut stats, ) .unwrap(); @@ -1896,6 +2013,7 @@ pub mod tests { shredder.keypair.deref(), &data_shreds[..count], true, // is_last_in_slot + next_code_index, &mut stats, ) .unwrap(); @@ -1909,6 +2027,7 @@ pub mod tests { shredder.keypair.deref(), &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], false, // is_last_in_slot + next_code_index, &mut stats, ) .unwrap(); @@ -1920,6 +2039,7 @@ pub mod tests { shredder.keypair.deref(), &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], true, // is_last_in_slot + next_code_index, &mut stats, ) .unwrap(); diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 68dcf68163ca51..a85dafddb10a0b 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -50,7 +50,15 @@ fn test_multi_fec_block_coding() { .collect(); let serialized_entries = bincode::serialize(&entries).unwrap(); +<<<<<<< HEAD let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &keypair, &entries, true, // is_last_in_slot + 0, // next_shred_index + 0, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let next_index = data_shreds.last().unwrap().index() + 1; assert_eq!(next_index as usize, num_data_shreds); assert_eq!(data_shreds.len(), num_data_shreds); @@ -220,8 +228,15 @@ fn setup_different_sized_fec_blocks( let total_num_data_shreds: usize = 2 * num_shreds_per_iter; for i in 0..2 { let is_last = i == 1; +<<<<<<< HEAD let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, is_last, next_index); +======= + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( + &keypair, &entries, is_last, next_index, // next_shred_index + next_index, // next_code_index + ); +>>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) for shred in &data_shreds { if (shred.index() as usize) == total_num_data_shreds - 1 { assert!(shred.data_complete()); From 48e3405af45d7923c6f2e983c34ef2c59061e05c Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sat, 5 Feb 2022 18:39:40 -0500 Subject: [PATCH 2/2] removes mergify merge conflicts --- core/benches/retransmit_stage.rs | 7 +- core/benches/shredder.rs | 21 +----- .../broadcast_duplicates_run.rs | 36 ++-------- .../broadcast_fake_shreds_run.rs | 6 -- .../fail_entry_verification_broadcast_run.rs | 17 +---- core/src/window_service.rs | 7 +- ledger/src/blockstore.rs | 49 ++------------ ledger/src/shred.rs | 65 ++----------------- ledger/tests/shred.rs | 13 +--- 9 files changed, 24 insertions(+), 197 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 829676af692019..579ddb9ab7e81c 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -99,17 +99,12 @@ fn bench_retransmitter(bencher: &mut Bencher) { let keypair = Arc::new(Keypair::new()); let slot = 0; let parent = 0; -<<<<<<< HEAD let shredder = Shredder::new(slot, parent, keypair, 0, 0).unwrap(); - let mut data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; -======= - let shredder = Shredder::new(slot, parent, 0, 0).unwrap(); let (mut data_shreds, _) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let num_packets = data_shreds.len(); diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 7ec1c390ee038d..2b962ace3eed81 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -75,13 +75,8 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); bencher.iter(|| { -<<<<<<< HEAD let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap(); - shredder.entries_to_shreds(&entries, true, 0); -======= - let shredder = Shredder::new(1, 0, 0, 0).unwrap(); - shredder.entries_to_shreds(&kp, &entries, true, 0, 0); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) + shredder.entries_to_shreds(&entries, true, 0, 0); }) } @@ -99,13 +94,8 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { let entries = make_large_unchained_entries(txs_per_entry, num_entries); // 1Mb bencher.iter(|| { -<<<<<<< HEAD let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap(); - shredder.entries_to_shreds(&entries, true, 0); -======= - let shredder = Shredder::new(1, 0, 0, 0).unwrap(); - shredder.entries_to_shreds(&kp, &entries, true, 0, 0); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) + shredder.entries_to_shreds(&entries, true, 0, 0); }) } @@ -117,13 +107,8 @@ fn bench_deshredder(bencher: &mut Bencher) { let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); -<<<<<<< HEAD let shredder = Shredder::new(1, 0, kp, 0, 0).unwrap(); - let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; -======= - let shredder = Shredder::new(1, 0, 0, 0).unwrap(); - let (data_shreds, _) = shredder.entries_to_shreds(&kp, &entries, true, 0, 0); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) + let (data_shreds, _) = shredder.entries_to_shreds(&entries, true, 0, 0); bencher.iter(|| { let raw = &mut Shredder::deshred(&data_shreds).unwrap(); assert_ne!(raw.len(), 0); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 88d7352e59cfdb..9d601237d3c17d 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -54,12 +54,9 @@ impl BroadcastDuplicatesRun { duplicate_queue: BlockhashQueue::default(), duplicate_entries_buffer: vec![], next_shred_index: u32::MAX, -<<<<<<< HEAD + next_code_index: 0, last_broadcast_slot: 0, last_duplicate_entry_hash: Hash::default(), -======= - next_code_index: 0, ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) shred_version, keypair, cluster_nodes_cache, @@ -163,21 +160,12 @@ impl BroadcastRun for BroadcastDuplicatesRun { let bank = receive_results.bank.clone(); let last_tick_height = receive_results.last_tick_height; -<<<<<<< HEAD if self.next_shred_index == u32::MAX { self.next_shred_index = blockstore .meta(bank.slot()) .expect("Database error") .map(|meta| meta.consumed) .unwrap_or(0) as u32 -======= - if bank.slot() != self.current_slot { - self.next_shred_index = 0; - self.next_code_index = 0; - self.current_slot = bank.slot(); - self.prev_entry_hash = None; - self.num_slots_broadcasted += 1; ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) } // We were not the leader, but just became leader again @@ -196,10 +184,6 @@ impl BroadcastRun for BroadcastDuplicatesRun { .expect("Expected to create a new shredder"); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( -<<<<<<< HEAD -======= - keypair, ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) &receive_results.entries, last_tick_height == bank.max_tick_height(), self.next_shred_index, @@ -207,7 +191,9 @@ impl BroadcastRun for BroadcastDuplicatesRun { ); self.next_shred_index += data_shreds.len() as u32; -<<<<<<< HEAD + if let Some(index) = coding_shreds.iter().map(Shred::index).max() { + self.next_code_index = index + 1; + } let (duplicate_entries, next_duplicate_shred_index) = self.queue_or_create_duplicate_entries(&bank, &receive_results); let (duplicate_data_shreds, duplicate_coding_shreds) = if !duplicate_entries.is_empty() { @@ -215,6 +201,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { &duplicate_entries, last_tick_height == bank.max_tick_height(), next_duplicate_shred_index, + next_duplicate_shred_index, ) } else { (vec![], vec![]) @@ -226,19 +213,6 @@ impl BroadcastRun for BroadcastDuplicatesRun { self.duplicate_queue .register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default()); } -======= - if let Some(index) = coding_shreds.iter().map(Shred::index).max() { - self.next_code_index = index + 1; - } - let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| { - let (original_last_data_shred, _) = - shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index, self.next_code_index); - - let (partition_last_data_shred, _) = - // Don't mark the last shred as last so that validators won't know that - // they've gotten all the shreds, and will continue trying to repair - shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index, self.next_code_index); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) // Partition network with duplicate and real shreds based on stake let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 7dbc47ecc383eb..a41c2129306403 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -9,11 +9,8 @@ pub(super) struct BroadcastFakeShredsRun { last_blockhash: Hash, partition: usize, shred_version: u16, -<<<<<<< HEAD keypair: Arc, -======= next_code_index: u32, ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) } impl BroadcastFakeShredsRun { @@ -22,11 +19,8 @@ impl BroadcastFakeShredsRun { last_blockhash: Hash::default(), partition, shred_version, -<<<<<<< HEAD keypair, -======= next_code_index: 0, ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) } } } diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 2436441171399e..829d559b7c3911 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -88,12 +88,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { ) .expect("Expected to create a new shredder"); -<<<<<<< HEAD - let (data_shreds, _) = shredder.entries_to_shreds( -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - keypair, ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) &receive_results.entries, last_tick_height == bank.max_tick_height() && last_entries.is_none(), self.next_shred_index, @@ -106,20 +101,12 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { } let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| { let (good_last_data_shred, _) = -<<<<<<< HEAD - shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index); -======= - shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index, self.next_code_index); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) + shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index, self.next_code_index); let (bad_last_data_shred, _) = // Don't mark the last shred as last so that validators won't know that // they've gotten all the shreds, and will continue trying to repair -<<<<<<< HEAD - shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index); -======= - shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index, self.next_code_index); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) + shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index, self.next_code_index); self.next_shred_index += 1; (good_last_data_shred, bad_last_data_shred) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 2670bfd01b9ee6..74d8b0a544a7e4 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -736,18 +736,13 @@ mod test { parent: Slot, keypair: &Arc, ) -> Vec { -<<<<<<< HEAD let shredder = Shredder::new(slot, parent, keypair.clone(), 0, 0).unwrap(); - shredder.entries_to_shreds(entries, true, 0).0 -======= - let shredder = Shredder::new(slot, parent, 0, 0).unwrap(); let (data_shreds, _) = shredder.entries_to_shreds( - keypair, entries, true, // is_last_in_slot + entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); data_shreds ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) } #[test] diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 3959e2aec505a0..97a3a521645849 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1665,18 +1665,12 @@ impl Blockstore { 0 } }; -<<<<<<< HEAD - let (mut data_shreds, mut coding_shreds) = - shredder.entries_to_shreds(¤t_entries, true, start_index); -======= let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds( - keypair, ¤t_entries, true, // is_last_in_slot start_index, // next_shred_index start_index, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); shredder = Shredder::new( @@ -1696,18 +1690,12 @@ impl Blockstore { } if !slot_entries.is_empty() { -<<<<<<< HEAD - let (mut data_shreds, mut coding_shreds) = - shredder.entries_to_shreds(&slot_entries, is_full_slot, 0); -======= let (mut data_shreds, mut coding_shreds) = shredder.entries_to_shreds( - keypair, &slot_entries, is_full_slot, 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); } @@ -3578,21 +3566,14 @@ pub fn create_new_ledger( let last_hash = entries.last().unwrap().hash; let version = solana_sdk::shred_version::version_from_hash(&last_hash); -<<<<<<< HEAD let shredder = Shredder::new(0, 0, Arc::new(Keypair::new()), 0, version).unwrap(); - let shreds = shredder.entries_to_shreds(&entries, true, 0).0; -======= - let shredder = Shredder::new(0, 0, 0, version).unwrap(); let shreds = shredder .entries_to_shreds( - &Keypair::new(), - &entries, - true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ) .0; ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) assert!(shreds.last().unwrap().last_in_slot()); blockstore.insert_shreds(shreds, None, false)?; @@ -3775,17 +3756,12 @@ pub fn entries_to_test_shreds( ) -> Vec { Shredder::new(slot, parent_slot, Arc::new(Keypair::new()), 0, version) .unwrap() -<<<<<<< HEAD - .entries_to_shreds(&entries, is_full_slot, 0) -======= .entries_to_shreds( - &Keypair::new(), &entries, is_full_slot, 0, // next_shred_index, 0, // next_code_index ) ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) .0 } @@ -8146,19 +8122,12 @@ pub mod tests { ) -> (Vec, Vec, Arc) { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); -<<<<<<< HEAD let shredder = Shredder::new(slot, parent_slot, leader_keypair.clone(), 0, 0).unwrap(); - let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); -======= - let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &leader_keypair, - &entries, - true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let genesis_config = create_genesis_config(2).genesis_config; let bank = Arc::new(Bank::new(&genesis_config)); @@ -8209,27 +8178,17 @@ pub mod tests { let entries1 = make_slot_entries_with_transactions(1); let entries2 = make_slot_entries_with_transactions(1); let leader_keypair = Arc::new(Keypair::new()); -<<<<<<< HEAD let shredder = Shredder::new(slot, 0, leader_keypair, 0, 0).unwrap(); - let (shreds, _) = shredder.entries_to_shreds(&entries1, true, 0); - let (duplicate_shreds, _) = shredder.entries_to_shreds(&entries2, true, 0); -======= - let shredder = Shredder::new(slot, 0, 0, 0).unwrap(); let (shreds, _) = shredder.entries_to_shreds( - &leader_keypair, - &entries1, - true, // is_last_in_slot + &entries1, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index, ); let (duplicate_shreds, _) = shredder.entries_to_shreds( - &leader_keypair, - &entries2, - true, // is_last_in_slot + &entries2, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let shred = shreds[0].clone(); let duplicate_shred = duplicate_shreds[0].clone(); let non_duplicate_shred = shred.clone(); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index e59db233309155..53c495e68a821d 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -77,16 +77,12 @@ use { pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }, -<<<<<<< HEAD std::{ convert::{TryFrom, TryInto}, mem::size_of, ops::Deref, sync::Arc, }, -======= - std::{cell::RefCell, convert::TryInto, mem::size_of}, ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) thiserror::Error, }; @@ -793,16 +789,10 @@ impl Shredder { &mut stats, ); let coding_shreds = Self::data_shreds_to_coding_shreds( -<<<<<<< HEAD self.keypair.deref(), &data_shreds, is_last_in_slot, -======= - keypair, - &data_shreds, - is_last_in_slot, next_code_index, ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) &mut stats, ) .unwrap(); @@ -1362,17 +1352,12 @@ pub mod tests { .saturating_sub(num_expected_data_shreds as usize) .max(num_expected_data_shreds as usize); let start_index = 0; -<<<<<<< HEAD - let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, start_index); -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot start_index, // next_shred_index start_index, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let next_index = data_shreds.last().unwrap().index() + 1; assert_eq!(next_index as u64, num_expected_data_shreds); @@ -1442,16 +1427,11 @@ pub mod tests { }) .collect(); -<<<<<<< HEAD - let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; - -======= let (data_shreds, _) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let deserialized_shred = Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload.clone()).unwrap(); assert_eq!(deserialized_shred, *data_shreds.last().unwrap()); @@ -1473,15 +1453,11 @@ pub mod tests { }) .collect(); -<<<<<<< HEAD - let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; -======= let (data_shreds, _) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) data_shreds.iter().for_each(|s| { assert_eq!(s.reference_tick(), 5); assert_eq!(Shred::reference_tick_from_data(&s.payload), 5); @@ -1508,15 +1484,11 @@ pub mod tests { }) .collect(); -<<<<<<< HEAD - let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; -======= let (data_shreds, _) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) data_shreds.iter().for_each(|s| { assert_eq!(s.reference_tick(), SHRED_TICK_REFERENCE_MASK); assert_eq!( @@ -1549,16 +1521,11 @@ pub mod tests { }) .collect(); -<<<<<<< HEAD - let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); - -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) for (i, s) in data_shreds.iter().enumerate() { verify_test_data_shred( s, @@ -1736,15 +1703,11 @@ pub mod tests { // Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds // and 2 missing coding shreds. Hint: should work let serialized_entries = bincode::serialize(&entries).unwrap(); -<<<<<<< HEAD - let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 25); -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot + &entries, true, // is_last_in_slot 25, // next_shred_index, 25, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) // We should have 10 shreds now assert_eq!(data_shreds.len(), num_data_shreds); @@ -1822,18 +1785,12 @@ pub mod tests { ) .unwrap(); let next_shred_index = rng.gen_range(1, 1024); -<<<<<<< HEAD - let (data_shreds, coding_shreds) = - shredder.entries_to_shreds(&[entry], is_last_in_slot, next_shred_index); -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &keypair, &[entry], is_last_in_slot, next_shred_index, next_shred_index, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let num_data_shreds = data_shreds.len(); let mut shreds = coding_shreds; shreds.extend(data_shreds.iter().cloned()); @@ -1886,15 +1843,11 @@ pub mod tests { }) .collect(); -<<<<<<< HEAD - let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) assert!(!data_shreds .iter() .chain(coding_shreds.iter()) @@ -1942,18 +1895,12 @@ pub mod tests { .collect(); let start_index = 0x12; -<<<<<<< HEAD - let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, start_index); - -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot start_index, // next_shred_index start_index, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; data_shreds.iter().enumerate().for_each(|(i, s)| { let expected_fec_set_index = start_index + ((i / max_per_block) * max_per_block) as u32; diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index a85dafddb10a0b..c95ffae319fe38 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -50,15 +50,11 @@ fn test_multi_fec_block_coding() { .collect(); let serialized_entries = bincode::serialize(&entries).unwrap(); -<<<<<<< HEAD - let (data_shreds, coding_shreds) = shredder.entries_to_shreds(&entries, true, 0); -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &keypair, &entries, true, // is_last_in_slot + &entries, true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) let next_index = data_shreds.last().unwrap().index() + 1; assert_eq!(next_index as usize, num_data_shreds); assert_eq!(data_shreds.len(), num_data_shreds); @@ -228,15 +224,10 @@ fn setup_different_sized_fec_blocks( let total_num_data_shreds: usize = 2 * num_shreds_per_iter; for i in 0..2 { let is_last = i == 1; -<<<<<<< HEAD - let (data_shreds, coding_shreds) = - shredder.entries_to_shreds(&entries, is_last, next_index); -======= let (data_shreds, coding_shreds) = shredder.entries_to_shreds( - &keypair, &entries, is_last, next_index, // next_shred_index + &entries, is_last, next_index, // next_shred_index next_index, // next_code_index ); ->>>>>>> 65d59f4ef (tracks erasure coding shreds' indices explicitly (#21822)) for shred in &data_shreds { if (shred.index() as usize) == total_num_data_shreds - 1 { assert!(shred.data_complete());