From 57ec9e262b1b4b3aeb71dd9f6aa99c54d8657bf7 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 31 Aug 2022 10:00:35 -0400 Subject: [PATCH] caches reed-solomon encoder/decoder instance ReedSolomon::new(...) initializes a matrix and an inversion-tree: https://github.com/rust-rse/reed-solomon-erasure/blob/eb1f66f47/src/core.rs#L450-L458 In order to cache this computation, this commit caches the reed-solomon encoder/decoder instance for each (data_shards, parity_shards) pair. --- core/benches/retransmit_stage.rs | 3 +- core/benches/shredder.rs | 16 ++- core/src/broadcast_stage.rs | 3 +- .../broadcast_duplicates_run.rs | 7 +- .../broadcast_fake_shreds_run.rs | 6 +- .../fail_entry_verification_broadcast_run.rs | 7 +- .../broadcast_stage/standard_broadcast_run.rs | 6 +- core/src/shred_fetch_stage.rs | 3 +- core/src/window_service.rs | 7 +- gossip/src/duplicate_shred.rs | 3 +- ledger/src/blockstore.rs | 34 +++++- ledger/src/shred.rs | 13 ++- ledger/src/shred/merkle.rs | 27 +++-- ledger/src/shredder.rs | 110 +++++++++++++++--- ledger/tests/shred.rs | 14 ++- 15 files changed, 211 insertions(+), 48 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 1b460ea03a522e..3730e42fef97c1 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -15,7 +15,7 @@ use { solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, leader_schedule_cache::LeaderScheduleCache, - shred::{ProcessShredsStats, Shredder}, + shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, }, solana_measure::measure::Measure, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -106,6 +106,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 8df66e9b47f362..e7fb1f733660e7 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -8,8 +8,8 @@ use { raptorq::{Decoder, Encoder}, solana_entry::entry::{create_ticks, Entry}, solana_ledger::shred::{ - max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags, - Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, + max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, + Shred, ShredFlags, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, }, solana_perf::test_tx, solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair}, @@ -52,6 +52,7 @@ fn make_shreds(num_shreds: usize) -> Vec { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); assert!(data_shreds.len() >= num_shreds); @@ -77,6 +78,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { // ~1Mb let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); + let reed_solomon_cache = ReedSolomonCache::default(); bencher.iter(|| { let shredder = Shredder::new(1, 0, 0, 0).unwrap(); shredder.entries_to_shreds( @@ -85,6 +87,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { true, 0, 0, + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); }) @@ -102,6 +105,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { Some(shred_size), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); + let reed_solomon_cache = ReedSolomonCache::default(); // 1Mb bencher.iter(|| { let shredder = Shredder::new(1, 0, 0, 0).unwrap(); @@ -111,6 +115,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { true, 0, 0, + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); }) @@ -131,6 +136,7 @@ fn bench_deshredder(bencher: &mut Bencher) { true, 0, 0, + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); bencher.iter(|| { @@ -155,10 +161,12 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) { fn bench_shredder_coding(bencher: &mut Bencher) { let symbol_count = DATA_SHREDS_PER_FEC_BLOCK; let data_shreds = make_shreds(symbol_count); + let reed_solomon_cache = ReedSolomonCache::default(); bencher.iter(|| { Shredder::generate_coding_shreds( &data_shreds[..symbol_count], 0, // next_code_index + &reed_solomon_cache, ) .len(); }) @@ -168,12 +176,14 @@ fn bench_shredder_coding(bencher: &mut Bencher) { fn bench_shredder_decoding(bencher: &mut Bencher) { let symbol_count = DATA_SHREDS_PER_FEC_BLOCK; let data_shreds = make_shreds(symbol_count); + let reed_solomon_cache = ReedSolomonCache::default(); let coding_shreds = Shredder::generate_coding_shreds( &data_shreds[..symbol_count], 0, // next_code_index + &reed_solomon_cache, ); bencher.iter(|| { - Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap(); + Shredder::try_recovery(coding_shreds[..].to_vec(), &reed_solomon_cache).unwrap(); }) } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 68729600d1983e..2b227223b7daaf 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -450,7 +450,7 @@ pub mod test { blockstore::Blockstore, genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, - shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder}, + shred::{max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, Shredder}, }, solana_runtime::bank::Bank, solana_sdk::{ @@ -488,6 +488,7 @@ pub mod test { true, // is_last_in_slot 0, // next_shred_index, 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); ( diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 9e60d6c8196cfe..c1e9e0e0578817 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -4,7 +4,7 @@ use { itertools::Itertools, solana_entry::entry::Entry, solana_gossip::contact_info::ContactInfo, - solana_ledger::shred::{ProcessShredsStats, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{ hash::Hash, signature::{Keypair, Signature, Signer}, @@ -36,6 +36,7 @@ pub(super) struct BroadcastDuplicatesRun { cluster_nodes_cache: Arc>, original_last_data_shreds: Arc>>, partition_last_data_shreds: Arc>>, + reed_solomon_cache: Arc, } impl BroadcastDuplicatesRun { @@ -56,6 +57,7 @@ impl BroadcastDuplicatesRun { cluster_nodes_cache, original_last_data_shreds: Arc::>>::default(), partition_last_data_shreds: Arc::>>::default(), + reed_solomon_cache: Arc::::default(), } } } @@ -163,6 +165,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { last_tick_height == bank.max_tick_height() && last_entries.is_none(), self.next_shred_index, self.next_code_index, + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); @@ -178,6 +181,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { true, self.next_shred_index, self.next_code_index, + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); // Don't mark the last shred as last so that validators won't @@ -189,6 +193,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { true, self.next_shred_index, self.next_code_index, + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); let sigs: Vec<_> = partition_last_data_shred diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 743cc9b072d6d0..e6a85ff4f7d0e1 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -1,7 +1,7 @@ use { super::*, solana_entry::entry::Entry, - solana_ledger::shred::{ProcessShredsStats, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{hash::Hash, signature::Keypair}, }; @@ -11,6 +11,7 @@ pub(super) struct BroadcastFakeShredsRun { partition: usize, shred_version: u16, next_code_index: u32, + reed_solomon_cache: Arc, } impl BroadcastFakeShredsRun { @@ -20,6 +21,7 @@ impl BroadcastFakeShredsRun { partition, shred_version, next_code_index: 0, + reed_solomon_cache: Arc::::default(), } } } @@ -60,6 +62,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { last_tick_height == bank.max_tick_height(), next_shred_index, self.next_code_index, + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); @@ -79,6 +82,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { last_tick_height == bank.max_tick_height(), next_shred_index, self.next_code_index, + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index c8fd4acc9e1cfc..e161ecfe99ff2e 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,7 +1,7 @@ use { super::*, crate::cluster_nodes::ClusterNodesCache, - solana_ledger::shred::{ProcessShredsStats, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{hash::Hash, signature::Keypair}, std::{thread::sleep, time::Duration}, }; @@ -17,6 +17,7 @@ pub(super) struct FailEntryVerificationBroadcastRun { next_shred_index: u32, next_code_index: u32, cluster_nodes_cache: Arc>, + reed_solomon_cache: Arc, } impl FailEntryVerificationBroadcastRun { @@ -32,6 +33,7 @@ impl FailEntryVerificationBroadcastRun { next_shred_index: 0, next_code_index: 0, cluster_nodes_cache, + reed_solomon_cache: Arc::::default(), } } } @@ -91,6 +93,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { last_tick_height == bank.max_tick_height() && last_entries.is_none(), self.next_shred_index, self.next_code_index, + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); @@ -105,6 +108,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { true, self.next_shred_index, self.next_code_index, + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); // Don't mark the last shred as last so that validators won't know @@ -116,6 +120,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { false, self.next_shred_index, self.next_code_index, + &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); self.next_shred_index += 1; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 87715f8b16c82b..cb096d7322044f 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -9,7 +9,7 @@ use { broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache, }, solana_entry::entry::Entry, - solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder}, solana_sdk::{ signature::Keypair, timing::{duration_as_us, AtomicInterval}, @@ -29,6 +29,7 @@ pub struct StandardBroadcastRun { last_datapoint_submit: Arc, num_batches: usize, cluster_nodes_cache: Arc>, + reed_solomon_cache: Arc, } impl StandardBroadcastRun { @@ -48,6 +49,7 @@ impl StandardBroadcastRun { last_datapoint_submit: Arc::default(), num_batches: 0, cluster_nodes_cache, + reed_solomon_cache: Arc::::default(), } } @@ -76,6 +78,7 @@ impl StandardBroadcastRun { true, // is_last_in_slot, state.next_shred_index, state.next_code_index, + &self.reed_solomon_cache, stats, ); self.report_and_reset_stats(true); @@ -124,6 +127,7 @@ impl StandardBroadcastRun { is_slot_end, next_shred_index, next_code_index, + &self.reed_solomon_cache, process_stats, ); let next_shred_index = match data_shreds.iter().map(Shred::index).max() { diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 25c9b00cdf7c76..cde36c8389b2cc 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -251,7 +251,7 @@ mod tests { super::*, solana_ledger::{ blockstore::MAX_DATA_SHREDS_PER_SLOT, - shred::{Shred, ShredFlags}, + shred::{ReedSolomonCache, Shred, ShredFlags}, }, }; @@ -294,6 +294,7 @@ mod tests { let coding = solana_ledger::shred::Shredder::generate_coding_shreds( &[shred], 3, // next_code_index + &ReedSolomonCache::default(), ); coding[0].copy_to_packet(&mut packet); assert!(!should_discard_packet( diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 2f7983bd86d7eb..18a69a316b754e 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -16,7 +16,7 @@ use { solana_ledger::{ blockstore::{Blockstore, BlockstoreInsertionMetrics}, leader_schedule_cache::LeaderScheduleCache, - shred::{self, Nonce, Shred}, + shred::{self, Nonce, ReedSolomonCache, Shred}, }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, @@ -220,6 +220,7 @@ fn run_insert( completed_data_sets_sender: &CompletedDataSetsSender, retransmit_sender: &Sender>, outstanding_requests: &RwLock, + reed_solomon_cache: &ReedSolomonCache, ) -> Result<()> where F: Fn(Shred), @@ -282,6 +283,7 @@ where false, // is_trusted Some(retransmit_sender), &handle_duplicate, + reed_solomon_cache, metrics, )?; for index in inserted_indices { @@ -411,6 +413,7 @@ impl WindowService { .thread_name(|i| format!("solWinInsert{:02}", i)) .build() .unwrap(); + let reed_solomon_cache = ReedSolomonCache::default(); Builder::new() .name("solWinInsert".to_string()) .spawn(move || { @@ -432,6 +435,7 @@ impl WindowService { &completed_data_sets_sender, &retransmit_sender, &outstanding_requests, + &reed_solomon_cache, ) { ws_metrics.record_error(&e); if Self::should_exit_on_error(e, &handle_error) { @@ -506,6 +510,7 @@ mod test { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); data_shreds diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 6ce635159101ab..c1dd1470b17905 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -284,7 +284,7 @@ pub(crate) mod tests { super::*, rand::Rng, solana_entry::entry::Entry, - solana_ledger::shred::{ProcessShredsStats, Shredder}, + solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, solana_sdk::{ hash, signature::{Keypair, Signer}, @@ -342,6 +342,7 @@ pub(crate) mod tests { true, // is_last_in_slot next_shred_index, next_shred_index, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); data_shreds.swap_remove(0) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 5dbc072f29d5bf..b34bd61506ec29 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -17,8 +17,8 @@ use { leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, shred::{ - self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, Shred, ShredData, - ShredId, ShredType, Shredder, + self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, ReedSolomonCache, + Shred, ShredData, ShredId, ShredType, Shredder, }, slot_stats::{ShredSource, SlotsStats}, }, @@ -628,6 +628,7 @@ impl Blockstore { recovered_shreds: &mut Vec, data_cf: &LedgerColumn, code_cf: &LedgerColumn, + reed_solomon_cache: &ReedSolomonCache, ) { // Find shreds for this erasure set and try recovery let slot = index.slot; @@ -646,7 +647,7 @@ impl Blockstore { code_cf, )) .collect(); - if let Ok(mut result) = shred::recover(available_shreds) { + if let Ok(mut result) = shred::recover(available_shreds, reed_solomon_cache) { Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len()); recovered_shreds.append(&mut result); } else { @@ -706,6 +707,7 @@ impl Blockstore { erasure_metas: &HashMap, index_working_set: &mut HashMap, prev_inserted_shreds: &HashMap, + reed_solomon_cache: &ReedSolomonCache, ) -> Vec { let data_cf = db.column::(); let code_cf = db.column::(); @@ -728,6 +730,7 @@ impl Blockstore { &mut recovered_shreds, &data_cf, &code_cf, + reed_solomon_cache, ); } ErasureMetaStatus::DataFull => { @@ -806,6 +809,7 @@ impl Blockstore { is_trusted: bool, retransmit_sender: Option<&Sender>>>, handle_duplicate: &F, + reed_solomon_cache: &ReedSolomonCache, metrics: &mut BlockstoreInsertionMetrics, ) -> Result<(Vec, Vec)> where @@ -893,6 +897,7 @@ impl Blockstore { &erasure_metas, &mut index_working_set, &just_inserted_shreds, + reed_solomon_cache, ); metrics.num_recovered += recovered_shreds @@ -1092,6 +1097,7 @@ impl Blockstore { is_trusted, None, // retransmit-sender &|_| {}, // handle-duplicates + &ReedSolomonCache::default(), &mut BlockstoreInsertionMetrics::default(), ) } @@ -1712,6 +1718,7 @@ impl Blockstore { let mut shredder = Shredder::new(current_slot, parent_slot, 0, version).unwrap(); let mut all_shreds = vec![]; let mut slot_entries = vec![]; + let reed_solomon_cache = ReedSolomonCache::default(); // Find all the entries for start_slot for entry in entries.into_iter() { if remaining_ticks_in_slot == 0 { @@ -1732,6 +1739,7 @@ impl Blockstore { true, // is_last_in_slot start_index, // next_shred_index start_index, // next_code_index + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); all_shreds.append(&mut data_shreds); @@ -1758,6 +1766,7 @@ impl Blockstore { is_full_slot, 0, // next_shred_index 0, // next_code_index + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); all_shreds.append(&mut data_shreds); @@ -3856,6 +3865,7 @@ pub fn create_new_ledger( true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); assert!(shreds.last().unwrap().last_in_slot()); @@ -4120,6 +4130,7 @@ pub fn entries_to_test_shreds( is_full_slot, 0, // next_shred_index, 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ) .0 @@ -8569,6 +8580,7 @@ pub mod tests { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); @@ -8623,6 +8635,7 @@ pub mod tests { let entries1 = make_slot_entries_with_transactions(1); let entries2 = make_slot_entries_with_transactions(1); let leader_keypair = Arc::new(Keypair::new()); + let reed_solomon_cache = ReedSolomonCache::default(); let shredder = Shredder::new(slot, 0, 0, 0).unwrap(); let (shreds, _) = shredder.entries_to_shreds( &leader_keypair, @@ -8630,6 +8643,7 @@ pub mod tests { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index, + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let (duplicate_shreds, _) = shredder.entries_to_shreds( @@ -8638,6 +8652,7 @@ pub mod tests { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let shred = shreds[0].clone(); @@ -8978,8 +8993,17 @@ pub mod tests { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - let coding1 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 0); - let coding2 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 1); + let reed_solomon_cache = ReedSolomonCache::default(); + let coding1 = Shredder::generate_coding_shreds( + &shreds, + 0, // next_code_index + &reed_solomon_cache, + ); + let coding2 = Shredder::generate_coding_shreds( + &shreds, + 1, // next_code_index + &reed_solomon_cache, + ); for shred in &shreds { info!("shred {:?}", shred); } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index aea429c5cbcf6d..e7bb2b34def094 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -75,7 +75,7 @@ pub use { shred_data::ShredData, stats::{ProcessShredsStats, ShredFetchStats}, }, - crate::shredder::Shredder, + crate::shredder::{ReedSolomonCache, Shredder}, }; mod common; @@ -714,20 +714,25 @@ impl TryFrom for ShredVariant { } } -pub(crate) fn recover(shreds: Vec) -> Result, Error> { +pub(crate) fn recover( + shreds: Vec, + reed_solomon_cache: &ReedSolomonCache, +) -> Result, Error> { match shreds .first() .ok_or(TooFewShardsPresent)? .common_header() .shred_variant { - ShredVariant::LegacyData | ShredVariant::LegacyCode => Shredder::try_recovery(shreds), + ShredVariant::LegacyData | ShredVariant::LegacyCode => { + Shredder::try_recovery(shreds, reed_solomon_cache) + } ShredVariant::MerkleCode(_) | ShredVariant::MerkleData(_) => { let shreds = shreds .into_iter() .map(merkle::Shred::try_from) .collect::>()?; - Ok(merkle::recover(shreds)? + Ok(merkle::recover(shreds, reed_solomon_cache)? .into_iter() .map(Shred::from) .collect()) diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index 9d0482b95354a5..cb84474e3511ff 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -12,7 +12,7 @@ use { SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADERS, SIZE_OF_SIGNATURE, }, - shredder::ReedSolomon, + shredder::ReedSolomonCache, }, reed_solomon_erasure::Error::{InvalidIndex, TooFewParityShards, TooFewShards}, solana_perf::packet::deserialize_from_with_limit, @@ -589,7 +589,10 @@ fn make_merkle_branch( Some(MerkleBranch { root, proof }) } -pub(super) fn recover(mut shreds: Vec) -> Result, Error> { +pub(super) fn recover( + mut shreds: Vec, + reed_solomon_cache: &ReedSolomonCache, +) -> Result, Error> { // Grab {common, coding} headers from first coding shred. let headers = shreds.iter().find_map(|shred| { let shred = match shred { @@ -666,7 +669,9 @@ pub(super) fn recover(mut shreds: Vec) -> Result, Error> { .iter() .map(|shred| Some(shred.as_ref()?.erasure_shard_as_slice().ok()?.to_vec())) .collect(); - ReedSolomon::new(num_data_shreds, num_coding_shreds)?.reconstruct(&mut shards)?; + reed_solomon_cache + .get(num_data_shreds, num_coding_shreds)? + .reconstruct(&mut shards)?; let mask: Vec<_> = shreds.iter().map(Option::is_some).collect(); // Reconstruct code and data shreds from erasure encoded shards. let mut shreds: Vec<_> = shreds @@ -847,9 +852,15 @@ mod test { #[test_case(73)] fn test_recover_merkle_shreds(num_shreds: usize) { let mut rng = rand::thread_rng(); + let reed_solomon_cache = ReedSolomonCache::default(); for num_data_shreds in 1..num_shreds { let num_coding_shreds = num_shreds - num_data_shreds; - run_recover_merkle_shreds(&mut rng, num_data_shreds, num_coding_shreds); + run_recover_merkle_shreds( + &mut rng, + num_data_shreds, + num_coding_shreds, + &reed_solomon_cache, + ); } } @@ -857,6 +868,7 @@ mod test { rng: &mut R, num_data_shreds: usize, num_coding_shreds: usize, + reed_solomon_cache: &ReedSolomonCache, ) { let keypair = Keypair::generate(rng); let num_shreds = num_data_shreds + num_coding_shreds; @@ -910,7 +922,8 @@ mod test { .collect::>() .unwrap(); let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds]; - ReedSolomon::new(num_data_shreds, num_coding_shreds) + reed_solomon_cache + .get(num_data_shreds, num_coding_shreds) .unwrap() .encode_sep(&data, &mut parity[..]) .unwrap(); @@ -970,12 +983,12 @@ mod test { ) }) { assert_matches!( - recover(shreds), + recover(shreds, reed_solomon_cache), Err(Error::ErasureError(TooFewParityShards)) ); continue; } - let recovered_shreds = recover(shreds).unwrap(); + let recovered_shreds = recover(shreds, reed_solomon_cache).unwrap(); assert_eq!(size + recovered_shreds.len(), num_shreds); assert_eq!(recovered_shreds.len(), removed_shreds.len()); removed_shreds.sort_by(|a, b| { diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index e979091ed333ba..a45a5702bfe38f 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -4,6 +4,7 @@ use { }, itertools::Itertools, lazy_static::lazy_static, + lru::LruCache, rayon::{prelude::*, ThreadPool}, reed_solomon_erasure::{ galois_8::Field, @@ -13,7 +14,11 @@ use { solana_measure::measure::Measure, solana_rayon_threadlimit::get_thread_count, solana_sdk::{clock::Slot, signature::Keypair}, - std::{borrow::Borrow, fmt::Debug}, + std::{ + borrow::Borrow, + fmt::Debug, + sync::{Arc, Mutex}, + }, }; lazy_static! { @@ -33,7 +38,11 @@ pub(crate) const ERASURE_BATCH_SIZE: [usize; 33] = [ 55, 56, 58, 59, 60, 62, 63, 64, // 32 ]; -pub(crate) type ReedSolomon = reed_solomon_erasure::ReedSolomon; +type ReedSolomon = reed_solomon_erasure::ReedSolomon; + +pub struct ReedSolomonCache( + Mutex>>, +); #[derive(Debug)] pub struct Shredder { @@ -69,6 +78,7 @@ impl Shredder { is_last_in_slot: bool, next_shred_index: u32, next_code_index: u32, + reed_solomon_cache: &ReedSolomonCache, stats: &mut ProcessShredsStats, ) -> ( Vec, // data shreds @@ -76,9 +86,14 @@ impl Shredder { ) { let data_shreds = self.entries_to_data_shreds(keypair, entries, is_last_in_slot, next_shred_index, stats); - let coding_shreds = - Self::data_shreds_to_coding_shreds(keypair, &data_shreds, next_code_index, stats) - .unwrap(); + let coding_shreds = Self::data_shreds_to_coding_shreds( + keypair, + &data_shreds, + next_code_index, + reed_solomon_cache, + stats, + ) + .unwrap(); (data_shreds, coding_shreds) } @@ -155,6 +170,7 @@ impl Shredder { keypair: &Keypair, data_shreds: &[Shred], next_code_index: u32, + reed_solomon_cache: &ReedSolomonCache, process_stats: &mut ProcessShredsStats, ) -> Result, Error> { if data_shreds.is_empty() { @@ -185,7 +201,7 @@ impl Shredder { .into_par_iter() .zip(next_code_index) .flat_map(|(shreds, next_code_index)| { - Shredder::generate_coding_shreds(&shreds, next_code_index) + Shredder::generate_coding_shreds(&shreds, next_code_index, reed_solomon_cache) }) .collect() }); @@ -209,6 +225,7 @@ impl Shredder { pub fn generate_coding_shreds>( data: &[T], next_code_index: u32, + reed_solomon_cache: &ReedSolomonCache, ) -> Vec { let (slot, index, version, fec_set_index) = { let shred = data.first().unwrap().borrow(); @@ -238,7 +255,8 @@ impl Shredder { .collect::>() .unwrap(); let mut parity = vec![vec![0u8; data[0].len()]; num_coding]; - ReedSolomon::new(num_data, num_coding) + reed_solomon_cache + .get(num_data, num_coding) .unwrap() .encode_sep(&data, &mut parity[..]) .unwrap(); @@ -263,7 +281,10 @@ impl Shredder { .collect() } - pub fn try_recovery(shreds: Vec) -> Result, Error> { + pub fn try_recovery( + shreds: Vec, + reed_solomon_cache: &ReedSolomonCache, + ) -> Result, Error> { let (slot, fec_set_index) = match shreds.first() { None => return Err(Error::from(TooFewShardsPresent)), Some(shred) => (shred.slot(), shred.fec_set_index()), @@ -303,7 +324,9 @@ impl Shredder { mask[index] = true; } } - ReedSolomon::new(num_data_shreds, num_coding_shreds)?.reconstruct_data(&mut shards)?; + reed_solomon_cache + .get(num_data_shreds, num_coding_shreds)? + .reconstruct_data(&mut shards)?; let recovered_data = mask .into_iter() .zip(shards) @@ -346,6 +369,38 @@ impl Shredder { } } +impl ReedSolomonCache { + const CAPACITY: usize = 4 * DATA_SHREDS_PER_FEC_BLOCK; + + pub(crate) fn get( + &self, + data_shards: usize, + parity_shards: usize, + ) -> Result, reed_solomon_erasure::Error> { + let key = (data_shards, parity_shards); + { + let mut cache = self.0.lock().unwrap(); + if let Some(entry) = cache.get(&key) { + return Ok(entry.clone()); + } + } + let entry = ReedSolomon::new(data_shards, parity_shards)?; + let entry = Arc::new(entry); + { + let entry = entry.clone(); + let mut cache = self.0.lock().unwrap(); + cache.put(key, entry); + } + Ok(entry) + } +} + +impl Default for ReedSolomonCache { + fn default() -> Self { + Self(Mutex::new(LruCache::new(Self::CAPACITY))) + } +} + /// Maps number of data shreds in each batch to the erasure batch size. fn get_erasure_batch_size(num_data_shreds: usize) -> usize { ERASURE_BATCH_SIZE @@ -444,6 +499,7 @@ mod tests { true, // is_last_in_slot start_index, // next_shred_index start_index, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); let next_index = data_shreds.last().unwrap().index() + 1; @@ -521,6 +577,7 @@ mod tests { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); let deserialized_shred = @@ -551,6 +608,7 @@ mod tests { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); data_shreds.iter().for_each(|s| { @@ -586,6 +644,7 @@ mod tests { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); data_shreds.iter().for_each(|s| { @@ -630,6 +689,7 @@ mod tests { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); for (i, s) in data_shreds.iter().enumerate() { @@ -677,6 +737,7 @@ mod tests { }) .collect(); + let reed_solomon_cache = ReedSolomonCache::default(); let serialized_entries = bincode::serialize(&entries).unwrap(); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, @@ -684,6 +745,7 @@ mod tests { is_last_in_slot, 0, // next_shred_index 0, // next_code_index + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let num_coding_shreds = coding_shreds.len(); @@ -703,12 +765,17 @@ mod tests { // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail assert_eq!( - Shredder::try_recovery(data_shreds[..data_shreds.len() - 1].to_vec()).unwrap(), + Shredder::try_recovery( + data_shreds[..data_shreds.len() - 1].to_vec(), + &reed_solomon_cache + ) + .unwrap(), Vec::default() ); // Test1: Try recovery/reassembly with only data shreds. Hint: should work - let recovered_data = Shredder::try_recovery(data_shreds[..].to_vec()).unwrap(); + let recovered_data = + Shredder::try_recovery(data_shreds[..].to_vec(), &reed_solomon_cache).unwrap(); assert!(recovered_data.is_empty()); // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work @@ -718,7 +785,8 @@ mod tests { .filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None }) .collect(); - let mut recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let mut recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); assert_eq!(recovered_data.len(), 2); // Data shreds 1 and 3 were missing let recovered_shred = recovered_data.remove(0); @@ -758,7 +826,8 @@ mod tests { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); assert_eq!(recovered_data.len(), 3); // Data shreds 0, 2, 4 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { @@ -812,6 +881,7 @@ mod tests { true, // is_last_in_slot 25, // next_shred_index, 25, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); // We should have 10 shreds now @@ -829,7 +899,8 @@ mod tests { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); assert_eq!(recovered_data.len(), 3); // Data shreds 25, 27, 29 were missing for (i, recovered_shred) in recovered_data.into_iter().enumerate() { @@ -853,7 +924,8 @@ mod tests { assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); // Test6: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds - let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); assert!(recovered_data.is_empty()); } @@ -897,12 +969,14 @@ mod tests { ) .unwrap(); let next_shred_index = rng.gen_range(1, 1024); + let reed_solomon_cache = ReedSolomonCache::default(); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, &[entry], is_last_in_slot, next_shred_index, next_shred_index, // next_code_index + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let num_data_shreds = data_shreds.len(); @@ -922,7 +996,7 @@ mod tests { .filter(|shred| shred.is_data()) .map(|shred| shred.index()) .collect(); - let recovered_shreds = Shredder::try_recovery(shreds).unwrap(); + let recovered_shreds = Shredder::try_recovery(shreds, &reed_solomon_cache).unwrap(); assert_eq!( recovered_shreds, data_shreds @@ -963,6 +1037,7 @@ mod tests { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); assert!(!data_shreds @@ -995,6 +1070,7 @@ mod tests { true, // is_last_in_slot start_index, // next_shred_index start_index, // next_code_index + &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK; @@ -1055,6 +1131,7 @@ mod tests { ); let next_code_index = data_shreds[0].index(); + let reed_solomon_cache = ReedSolomonCache::default(); for size in (1..data_shreds.len()).step_by(5) { let data_shreds = &data_shreds[..size]; @@ -1062,6 +1139,7 @@ mod tests { &keypair, data_shreds, next_code_index, + &reed_solomon_cache, &mut stats, ) .unwrap(); diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 23a1cf83f967f7..b788c3b230d696 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -2,8 +2,8 @@ use { solana_entry::entry::Entry, solana_ledger::shred::{ - max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, Shred, Shredder, - DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, + max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, ReedSolomonCache, + Shred, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY, }, solana_sdk::{ clock::Slot, @@ -47,6 +47,7 @@ fn test_multi_fec_block_coding() { }) .collect(); + let reed_solomon_cache = ReedSolomonCache::default(); let serialized_entries = bincode::serialize(&entries).unwrap(); let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &keypair, @@ -54,6 +55,7 @@ fn test_multi_fec_block_coding() { true, // is_last_in_slot 0, // next_shred_index 0, // next_code_index + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); let next_index = data_shreds.last().unwrap().index() + 1; @@ -81,7 +83,8 @@ fn test_multi_fec_block_coding() { .filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None }) .collect(); - let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap(); + let recovered_data = + Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap(); for (i, recovered_shred) in recovered_data.into_iter().enumerate() { let index = shred_start_index + (i * 2); @@ -115,6 +118,7 @@ fn test_multi_fec_block_different_size_coding() { setup_different_sized_fec_blocks(slot, parent_slot, keypair.clone()); let total_num_data_shreds: usize = fec_data.values().map(|x| x.len()).sum(); + let reed_solomon_cache = ReedSolomonCache::default(); // Test recovery for (fec_data_shreds, fec_coding_shreds) in fec_data.values().zip(fec_coding.values()) { let first_data_index = fec_data_shreds.first().unwrap().index() as usize; @@ -124,7 +128,7 @@ fn test_multi_fec_block_different_size_coding() { .chain(fec_coding_shreds.iter().step_by(2)) .cloned() .collect(); - let recovered_data = Shredder::try_recovery(all_shreds).unwrap(); + let recovered_data = Shredder::try_recovery(all_shreds, &reed_solomon_cache).unwrap(); // Necessary in order to ensure the last shred in the slot // is part of the recovered set, and that the below `index` // calcuation in the loop is correct @@ -218,6 +222,7 @@ fn setup_different_sized_fec_blocks( let mut coding_slot_and_index = HashSet::new(); let total_num_data_shreds: usize = 2 * num_shreds_per_iter; + let reed_solomon_cache = ReedSolomonCache::default(); for i in 0..2 { let is_last = i == 1; let (data_shreds, coding_shreds) = shredder.entries_to_shreds( @@ -226,6 +231,7 @@ fn setup_different_sized_fec_blocks( is_last, next_shred_index, next_code_index, + &reed_solomon_cache, &mut ProcessShredsStats::default(), ); for shred in &data_shreds {