From 5727f694fe8754fd7f76e6fca800cd8e0207bbd8 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 18 Feb 2024 13:44:40 -0500 Subject: [PATCH 1/7] remove lock on fee updates --- Cargo.lock | 1 - runtime/Cargo.toml | 1 - runtime/src/prioritization_fee_cache.rs | 317 +++++++++++++----------- 3 files changed, 170 insertions(+), 149 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a727fae2c8b0ee..52378a3b6ab1b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6939,7 +6939,6 @@ dependencies = [ "lazy_static", "libsecp256k1", "log", - "lru", "lz4", "memmap2", "memoffset 0.9.0", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index b14ffab2076ca3..7a08076d451512 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -29,7 +29,6 @@ index_list = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } -lru = { workspace = true } lz4 = { workspace = true } memmap2 = { workspace = true } mockall = { workspace = true } diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 839519020ff42f..0d664685d6da21 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -3,7 +3,6 @@ use { crossbeam_channel::{unbounded, Receiver, Sender}, dashmap::DashMap, log::*, - lru::LruCache, solana_measure::measure, solana_sdk::{ clock::{BankId, Slot}, @@ -11,7 +10,7 @@ use { transaction::SanitizedTransaction, }, std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, @@ -118,6 +117,7 @@ impl PrioritizationFeeCacheMetrics { } } +#[derive(Debug)] enum CacheServiceUpdate { TransactionUpdate { slot: Slot, @@ -141,7 +141,7 @@ type SlotPrioritizationFee = DashMap; /// and collecting stats and reporting metrics. #[derive(Debug)] pub struct PrioritizationFeeCache { - cache: Arc>>>, + cache: Arc>>, service_thread: Option>, sender: Sender, metrics: Arc, @@ -166,17 +166,17 @@ impl Drop for PrioritizationFeeCache { impl PrioritizationFeeCache { pub fn new(capacity: u64) -> Self { - let metrics = Arc::new(PrioritizationFeeCacheMetrics::default()); + let cache = Arc::new(RwLock::new(BTreeMap::new())); let (sender, receiver) = unbounded(); - let cache = Arc::new(RwLock::new(LruCache::new(capacity as usize))); + let metrics = Arc::new(PrioritizationFeeCacheMetrics::default()); - let cache_clone = cache.clone(); - let metrics_clone = metrics.clone(); let service_thread = Some( Builder::new() .name("solPrFeeCachSvc".to_string()) - .spawn(move || { - Self::service_loop(cache_clone, receiver, metrics_clone); + .spawn({ + let cache = cache.clone(); + let metrics = metrics.clone(); + move || Self::service_loop(cache, capacity as usize, receiver, metrics) }) .unwrap(), ); @@ -189,22 +189,6 @@ impl PrioritizationFeeCache { } } - /// Get prioritization fee entry, create new entry if necessary - fn get_prioritization_fee( - cache: Arc>>>, - slot: &Slot, - ) -> Arc { - let mut cache = cache.write().unwrap(); - match cache.get(slot) { - Some(entry) => Arc::clone(entry), - None => { - let entry = Arc::new(SlotPrioritizationFee::default()); - cache.put(*slot, Arc::clone(&entry)); - entry - } - } - } - /// Update with a list of non-vote transactions' compute_budget_details and account_locks; Only /// transactions have both valid compute_budget_details and account_locks will be used to update /// fee_cache asynchronously. @@ -279,50 +263,60 @@ impl PrioritizationFeeCache { }); } - /// Internal function is invoked by worker thread to update slot's minimum prioritization fee, - /// Cache lock contends here. + /// Internal function is invoked by worker thread to update slot's minimum prioritization fee. fn update_cache( - cache: Arc>>>, - slot: &Slot, - bank_id: &BankId, + unfinalized: &mut BTreeMap, + slot: Slot, + bank_id: BankId, transaction_fee: u64, writable_accounts: Arc>, - metrics: Arc, + metrics: &PrioritizationFeeCacheMetrics, ) { - let (slot_prioritization_fee, cache_lock_time) = - measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); - let (_, entry_update_time) = measure!( { - let mut block_prioritization_fee = slot_prioritization_fee - .entry(*bank_id) + let mut block_prioritization_fee = unfinalized + .entry(slot) + .or_default() + .entry(bank_id) .or_insert(PrioritizationFee::default()); block_prioritization_fee.update(transaction_fee, &writable_accounts) }, "entry_update_time" ); - metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us()); metrics.accumulate_successful_transaction_update_count(1); } fn finalize_slot( - cache: Arc>>>, - slot: &Slot, - bank_id: &BankId, - metrics: Arc, + unfinalized: &mut BTreeMap, + cache: &RwLock>, + cache_max_size: usize, + slot: Slot, + bank_id: BankId, + metrics: &PrioritizationFeeCacheMetrics, ) { - let (slot_prioritization_fee, cache_lock_time) = - measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); + // remove unfinalized slots + // TODO: do we need to keep slots in buffer? or they always come in order? + loop { + match unfinalized.keys().next().cloned() { + Some(unfinalized_slot) if unfinalized_slot < slot - 128 => unfinalized.pop_first(), + _ => break, + }; + } + + let slot_prioritization_fee = match unfinalized.remove(&slot) { + Some(slot_prioritization_fee) => slot_prioritization_fee, + None => return, + }; // prune cache by evicting write account entry from prioritization fee if its fee is less // or equal to block's minimum transaction fee, because they are irrelevant in calculating // block minimum fee. - let (result, slot_finalize_time) = measure!( + let (_, slot_finalize_time) = measure!( { // Only retain priority fee reported from optimistically confirmed bank let pre_purge_bank_count = slot_prioritization_fee.len() as u64; - slot_prioritization_fee.retain(|id, _| id == bank_id); + slot_prioritization_fee.retain(|id, _| *id == bank_id); let post_purge_bank_count = slot_prioritization_fee.len() as u64; metrics.accumulate_total_purged_duplicated_bank_count( pre_purge_bank_count.saturating_sub(post_purge_bank_count), @@ -334,30 +328,42 @@ impl PrioritizationFeeCache { } let mut block_prioritization_fee = slot_prioritization_fee - .entry(*bank_id) + .entry(bank_id) .or_insert(PrioritizationFee::default()); - let result = block_prioritization_fee.mark_block_completed(); - block_prioritization_fee.report_metrics(*slot); - result + if let Err(err) = block_prioritization_fee.mark_block_completed() { + error!( + "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", + err + ); + } + block_prioritization_fee.report_metrics(slot); }, "slot_finalize_time" ); - metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us()); - if let Err(err) = result { - error!( - "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", - err - ); - } + // Create new cache entry + let (_, cache_lock_time) = measure!( + { + let mut cache = cache.write().unwrap(); + while cache.len() >= cache_max_size { + cache.pop_first(); + } + cache.insert(slot, slot_prioritization_fee); + }, + "cache_lock_time" + ); + metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); } fn service_loop( - cache: Arc>>>, + cache: Arc>>, + cache_max_size: usize, receiver: Receiver, metrics: Arc, ) { + let mut unfinalized = BTreeMap::::new(); + for update in receiver.iter() { match update { CacheServiceUpdate::TransactionUpdate { @@ -366,15 +372,22 @@ impl PrioritizationFeeCache { transaction_fee, writable_accounts, } => Self::update_cache( - cache.clone(), - &slot, - &bank_id, + &mut unfinalized, + slot, + bank_id, transaction_fee, writable_accounts, - metrics.clone(), + &metrics, ), CacheServiceUpdate::BankFinalized { slot, bank_id } => { - Self::finalize_slot(cache.clone(), &slot, &bank_id, metrics.clone()); + Self::finalize_slot( + &mut unfinalized, + &cache, + cache_max_size, + slot, + bank_id, + &metrics, + ); metrics.report(slot); } @@ -387,16 +400,7 @@ impl PrioritizationFeeCache { /// Returns number of blocks that have finalized minimum fees collection pub fn available_block_count(&self) -> usize { - self.cache - .read() - .unwrap() - .iter() - .filter(|(_slot, slot_prioritization_fee)| { - slot_prioritization_fee - .iter() - .any(|prioritization_fee| prioritization_fee.is_finalized()) - }) - .count() + self.cache.read().unwrap().len() } pub fn get_prioritization_fees(&self, account_keys: &[Pubkey]) -> HashMap { @@ -408,22 +412,19 @@ impl PrioritizationFeeCache { slot_prioritization_fee .iter() .find_map(|prioritization_fee| { - prioritization_fee.is_finalized().then(|| { - let mut fee = prioritization_fee - .get_min_transaction_fee() - .unwrap_or_default(); - for account_key in account_keys { - if let Some(account_fee) = - prioritization_fee.get_writable_account_fee(account_key) - { - fee = std::cmp::max(fee, account_fee); - } + let mut fee = prioritization_fee + .get_min_transaction_fee() + .unwrap_or_default(); + for account_key in account_keys { + if let Some(account_fee) = + prioritization_fee.get_writable_account_fee(account_key) + { + fee = std::cmp::max(fee, account_fee); } - Some((*slot, fee)) - }) + } + Some((*slot, fee)) }) }) - .flatten() .collect() } } @@ -493,18 +494,22 @@ mod tests { slot: Slot, bank_id: BankId, ) { + // mark as finalized prioritization_fee_cache.finalize_priority_fee(slot, bank_id); - let fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); // wait till finalization is done - while !fee - .get(&bank_id) - .map_or(false, |block_fee| block_fee.is_finalized()) - { - std::thread::sleep(std::time::Duration::from_millis(100)); + loop { + let cache = prioritization_fee_cache.cache.read().unwrap(); + if let Some(slot_cache) = cache.get(&slot) { + if let Some(block_fee) = slot_cache.get(&bank_id) { + if block_fee.is_finalized() { + return; + } + } + } + drop(cache); + + std::thread::sleep(std::time::Duration::from_millis(10)); } } @@ -538,28 +543,25 @@ mod tests { // assert block minimum fee and account a, b, c fee accordingly { - let fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); - let fee = fee.get(&bank.bank_id()).unwrap(); - assert_eq!(2, fee.get_min_transaction_fee().unwrap()); - assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap()); - assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); - assert_eq!(2, fee.get_writable_account_fee(&write_account_c).unwrap()); - // assert unknown account d fee - assert!(fee - .get_writable_account_fee(&Pubkey::new_unique()) - .is_none()); + // Not possible to check the state in the thread + // let lock = prioritization_fee_cache.cache.read().unwrap(); + // let fee = lock.get(&slot).unwrap(); + // let fee = fee.get(&bank.bank_id()).unwrap(); + // assert_eq!(2, fee.get_min_transaction_fee().unwrap()); + // assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap()); + // assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); + // assert_eq!(2, fee.get_writable_account_fee(&write_account_c).unwrap()); + // // assert unknown account d fee + // assert!(fee + // .get_writable_account_fee(&Pubkey::new_unique()) + // .is_none()); } // assert after prune, account a and c should be removed from cache to save space { sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id()); - let fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); + let lock = prioritization_fee_cache.cache.read().unwrap(); + let fee = lock.get(&slot).unwrap(); let fee = fee.get(&bank.bank_id()).unwrap(); assert_eq!(2, fee.get_min_transaction_fee().unwrap()); assert!(fee.get_writable_account_fee(&write_account_a).is_none()); @@ -572,28 +574,49 @@ mod tests { fn test_available_block_count() { let prioritization_fee_cache = PrioritizationFeeCache::default(); - assert!(PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &1 - ) - .entry(1) - .or_default() - .mark_block_completed() - .is_ok()); - assert!(PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &2 - ) - .entry(2) - .or_default() - .mark_block_completed() - .is_ok()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new_rw_arc(bank0); + let bank = bank_forks.read().unwrap().working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + + let bank1 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 1)); + sync_update( + &prioritization_fee_cache, + bank1.clone(), + vec![build_sanitized_transaction_for_test( + 1, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + )] + .iter(), + ); + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1, bank1.bank_id()); + + let bank2 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 2)); + sync_update( + &prioritization_fee_cache, + bank2.clone(), + vec![build_sanitized_transaction_for_test( + 1, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + )] + .iter(), + ); + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank2.bank_id()); + // add slot 3 entry to cache, but not finalize it - PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3) - .entry(3) - .or_default(); + let bank3 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 3)); + let txs = vec![build_sanitized_transaction_for_test( + 1, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + )]; + sync_update(&prioritization_fee_cache, bank3.clone(), txs.iter()); // assert available block count should be 2 finalized blocks + std::thread::sleep(std::time::Duration::from_millis(1_000)); assert_eq!(2, prioritization_fee_cache.available_block_count()); } @@ -883,12 +906,13 @@ mod tests { ]; sync_update(&prioritization_fee_cache, bank1.clone(), txs.iter()); - let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); - assert_eq!(1, slot_prioritization_fee.len()); - assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + // Not possible to check the state in the thread + // let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( + // prioritization_fee_cache.cache.clone(), + // &slot, + // ); + // assert_eq!(1, slot_prioritization_fee.len()); + // assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); } // Assert after add transactions for bank2 of slot 1 @@ -903,23 +927,22 @@ mod tests { ]; sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter()); - let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); - assert_eq!(2, slot_prioritization_fee.len()); - assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); - assert!(slot_prioritization_fee.contains_key(&bank2.bank_id())); + // Not possible to check the state in the thread + // let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( + // prioritization_fee_cache.cache.clone(), + // &slot, + // ); + // assert_eq!(2, slot_prioritization_fee.len()); + // assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + // assert!(slot_prioritization_fee.contains_key(&bank2.bank_id())); } // Assert after finalize with bank1 of slot 1, { sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank1.bank_id()); - let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); + let cache_lock = prioritization_fee_cache.cache.read().unwrap(); + let slot_prioritization_fee = cache_lock.get(&slot).unwrap(); assert_eq!(1, slot_prioritization_fee.len()); assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); From 920673f0a343dbef7acd77757597d4354566263f Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 18 Feb 2024 13:48:04 -0500 Subject: [PATCH 2/7] remove not required Arc --- runtime/src/prioritization_fee_cache.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 0d664685d6da21..acc47683c81c38 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -123,7 +123,7 @@ enum CacheServiceUpdate { slot: Slot, bank_id: BankId, transaction_fee: u64, - writable_accounts: Arc>, + writable_accounts: Vec, }, BankFinalized { slot: Slot, @@ -219,14 +219,12 @@ impl PrioritizationFeeCache { continue; } - let writable_accounts = Arc::new( - account_locks - .unwrap() - .writable - .iter() - .map(|key| **key) - .collect::>(), - ); + let writable_accounts = account_locks + .unwrap() + .writable + .iter() + .map(|key| **key) + .collect::>(); self.sender .send(CacheServiceUpdate::TransactionUpdate { @@ -269,7 +267,7 @@ impl PrioritizationFeeCache { slot: Slot, bank_id: BankId, transaction_fee: u64, - writable_accounts: Arc>, + writable_accounts: &[Pubkey], metrics: &PrioritizationFeeCacheMetrics, ) { let (_, entry_update_time) = measure!( @@ -279,7 +277,7 @@ impl PrioritizationFeeCache { .or_default() .entry(bank_id) .or_insert(PrioritizationFee::default()); - block_prioritization_fee.update(transaction_fee, &writable_accounts) + block_prioritization_fee.update(transaction_fee, writable_accounts) }, "entry_update_time" ); @@ -376,7 +374,7 @@ impl PrioritizationFeeCache { slot, bank_id, transaction_fee, - writable_accounts, + &writable_accounts, &metrics, ), CacheServiceUpdate::BankFinalized { slot, bank_id } => { From 4c0089d67b2c06c2e4efaad2b387b3474be495ff Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 18 Feb 2024 16:21:09 -0500 Subject: [PATCH 3/7] remove not required dashmap --- runtime/src/prioritization_fee_cache.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index acc47683c81c38..c4aeb9f723dd72 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -1,7 +1,6 @@ use { crate::{bank::Bank, compute_budget_details::GetComputeBudgetDetails, prioritization_fee::*}, crossbeam_channel::{unbounded, Receiver, Sender}, - dashmap::DashMap, log::*, solana_measure::measure, solana_sdk::{ @@ -134,7 +133,7 @@ enum CacheServiceUpdate { /// Potentially there are more than one bank that updates Prioritization Fee /// for a slot. The updates are tracked and finalized by bank_id. -type SlotPrioritizationFee = DashMap; +type SlotPrioritizationFee = HashMap; /// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee, /// A separate internal thread `service_thread` handles additional tasks when a bank is frozen, @@ -272,7 +271,7 @@ impl PrioritizationFeeCache { ) { let (_, entry_update_time) = measure!( { - let mut block_prioritization_fee = unfinalized + let block_prioritization_fee = unfinalized .entry(slot) .or_default() .entry(bank_id) @@ -302,7 +301,7 @@ impl PrioritizationFeeCache { }; } - let slot_prioritization_fee = match unfinalized.remove(&slot) { + let mut slot_prioritization_fee = match unfinalized.remove(&slot) { Some(slot_prioritization_fee) => slot_prioritization_fee, None => return, }; @@ -325,7 +324,7 @@ impl PrioritizationFeeCache { warn!("Finalized bank has empty prioritization fee cache. slot {slot} bank id {bank_id}"); } - let mut block_prioritization_fee = slot_prioritization_fee + let block_prioritization_fee = slot_prioritization_fee .entry(bank_id) .or_insert(PrioritizationFee::default()); if let Err(err) = block_prioritization_fee.mark_block_completed() { @@ -408,7 +407,7 @@ impl PrioritizationFeeCache { .iter() .filter_map(|(slot, slot_prioritization_fee)| { slot_prioritization_fee - .iter() + .values() .find_map(|prioritization_fee| { let mut fee = prioritization_fee .get_min_transaction_fee() From 734cdc1724fcf1c94bc6dd2cd1c82a2495153a98 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 18 Feb 2024 17:26:00 -0500 Subject: [PATCH 4/7] remove not required HashMap --- runtime/src/prioritization_fee_cache.rs | 99 ++++++++++++------------- 1 file changed, 48 insertions(+), 51 deletions(-) diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index c4aeb9f723dd72..385cb8e0fe313f 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -140,7 +140,7 @@ type SlotPrioritizationFee = HashMap; /// and collecting stats and reporting metrics. #[derive(Debug)] pub struct PrioritizationFeeCache { - cache: Arc>>, + cache: Arc>>, service_thread: Option>, sender: Sender, metrics: Arc, @@ -286,7 +286,7 @@ impl PrioritizationFeeCache { fn finalize_slot( unfinalized: &mut BTreeMap, - cache: &RwLock>, + cache: &RwLock>, cache_max_size: usize, slot: Slot, bank_id: BankId, @@ -309,12 +309,13 @@ impl PrioritizationFeeCache { // prune cache by evicting write account entry from prioritization fee if its fee is less // or equal to block's minimum transaction fee, because they are irrelevant in calculating // block minimum fee. - let (_, slot_finalize_time) = measure!( + let (slot_prioritization_fee, slot_finalize_time) = measure!( { // Only retain priority fee reported from optimistically confirmed bank let pre_purge_bank_count = slot_prioritization_fee.len() as u64; - slot_prioritization_fee.retain(|id, _| *id == bank_id); - let post_purge_bank_count = slot_prioritization_fee.len() as u64; + let mut slot_prioritization_fee = slot_prioritization_fee.remove(&bank_id); + let post_purge_bank_count = + slot_prioritization_fee.as_ref().map(|_| 1).unwrap_or(1); metrics.accumulate_total_purged_duplicated_bank_count( pre_purge_bank_count.saturating_sub(post_purge_bank_count), ); @@ -324,37 +325,39 @@ impl PrioritizationFeeCache { warn!("Finalized bank has empty prioritization fee cache. slot {slot} bank id {bank_id}"); } - let block_prioritization_fee = slot_prioritization_fee - .entry(bank_id) - .or_insert(PrioritizationFee::default()); - if let Err(err) = block_prioritization_fee.mark_block_completed() { - error!( - "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", - err - ); + if let Some(slot_prioritization_fee) = &mut slot_prioritization_fee { + if let Err(err) = slot_prioritization_fee.mark_block_completed() { + error!( + "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", + err + ); + } + slot_prioritization_fee.report_metrics(slot); } - block_prioritization_fee.report_metrics(slot); + slot_prioritization_fee }, "slot_finalize_time" ); metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us()); // Create new cache entry - let (_, cache_lock_time) = measure!( - { - let mut cache = cache.write().unwrap(); - while cache.len() >= cache_max_size { - cache.pop_first(); - } - cache.insert(slot, slot_prioritization_fee); - }, - "cache_lock_time" - ); - metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); + if let Some(slot_prioritization_fee) = slot_prioritization_fee { + let (_, cache_lock_time) = measure!( + { + let mut cache = cache.write().unwrap(); + while cache.len() >= cache_max_size { + cache.pop_first(); + } + cache.insert(slot, slot_prioritization_fee); + }, + "cache_lock_time" + ); + metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); + } } fn service_loop( - cache: Arc>>, + cache: Arc>>, cache_max_size: usize, receiver: Receiver, metrics: Arc, @@ -405,22 +408,18 @@ impl PrioritizationFeeCache { .read() .unwrap() .iter() - .filter_map(|(slot, slot_prioritization_fee)| { - slot_prioritization_fee - .values() - .find_map(|prioritization_fee| { - let mut fee = prioritization_fee - .get_min_transaction_fee() - .unwrap_or_default(); - for account_key in account_keys { - if let Some(account_fee) = - prioritization_fee.get_writable_account_fee(account_key) - { - fee = std::cmp::max(fee, account_fee); - } - } - Some((*slot, fee)) - }) + .map(|(slot, slot_prioritization_fee)| { + let mut fee = slot_prioritization_fee + .get_min_transaction_fee() + .unwrap_or_default(); + for account_key in account_keys { + if let Some(account_fee) = + slot_prioritization_fee.get_writable_account_fee(account_key) + { + fee = std::cmp::max(fee, account_fee); + } + } + (*slot, fee) }) .collect() } @@ -498,10 +497,8 @@ mod tests { loop { let cache = prioritization_fee_cache.cache.read().unwrap(); if let Some(slot_cache) = cache.get(&slot) { - if let Some(block_fee) = slot_cache.get(&bank_id) { - if block_fee.is_finalized() { - return; - } + if slot_cache.is_finalized() { + return; } } drop(cache); @@ -559,7 +556,6 @@ mod tests { sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id()); let lock = prioritization_fee_cache.cache.read().unwrap(); let fee = lock.get(&slot).unwrap(); - let fee = fee.get(&bank.bank_id()).unwrap(); assert_eq!(2, fee.get_min_transaction_fee().unwrap()); assert!(fee.get_writable_account_fee(&write_account_a).is_none()); assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); @@ -938,10 +934,11 @@ mod tests { { sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank1.bank_id()); - let cache_lock = prioritization_fee_cache.cache.read().unwrap(); - let slot_prioritization_fee = cache_lock.get(&slot).unwrap(); - assert_eq!(1, slot_prioritization_fee.len()); - assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + // Not possible to check bank_id + // let cache_lock = prioritization_fee_cache.cache.read().unwrap(); + // let slot_prioritization_fee = cache_lock.get(&slot).unwrap(); + // assert_eq!(1, slot_prioritization_fee.len()); + // assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); // and data available for query are from bank1 assert_eq!( From 49f6880dde56209f1cf3fe80ad2e7d2657832469 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 18 Feb 2024 18:47:06 -0500 Subject: [PATCH 5/7] update transactions in one message --- runtime/src/prioritization_fee.rs | 23 ++-- runtime/src/prioritization_fee_cache.rs | 135 +++++++++++++----------- 2 files changed, 78 insertions(+), 80 deletions(-) diff --git a/runtime/src/prioritization_fee.rs b/runtime/src/prioritization_fee.rs index 90cc66b981ce3a..45425059f98c15 100644 --- a/runtime/src/prioritization_fee.rs +++ b/runtime/src/prioritization_fee.rs @@ -165,11 +165,7 @@ impl Default for PrioritizationFee { impl PrioritizationFee { /// Update self for minimum transaction fee in the block and minimum fee for each writable account. - pub fn update( - &mut self, - transaction_fee: u64, - writable_accounts: &[Pubkey], - ) -> Result<(), PrioritizationFeeError> { + pub fn update(&mut self, transaction_fee: u64, writable_accounts: Vec) { let (_, update_time) = measure!( { if !self.is_finalized { @@ -177,9 +173,9 @@ impl PrioritizationFee { self.min_transaction_fee = transaction_fee; } - for write_account in writable_accounts.iter() { + for write_account in writable_accounts { self.min_writable_account_fees - .entry(*write_account) + .entry(write_account) .and_modify(|write_lock_fee| { *write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee) }) @@ -199,7 +195,6 @@ impl PrioritizationFee { self.metrics .accumulate_total_update_elapsed_us(update_time.as_us()); - Ok(()) } /// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are @@ -283,9 +278,7 @@ mod tests { // ----------------------------------------------------------------------- // [5, a, b ] --> [5, 5, 5, nil ] { - assert!(prioritization_fee - .update(5, &[write_account_a, write_account_b]) - .is_ok()); + prioritization_fee.update(5, vec![write_account_a, write_account_b]); assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 5, @@ -309,9 +302,7 @@ mod tests { // ----------------------------------------------------------------------- // [9, b, c ] --> [5, 5, 5, 9 ] { - assert!(prioritization_fee - .update(9, &[write_account_b, write_account_c]) - .is_ok()); + prioritization_fee.update(9, vec![write_account_b, write_account_c]); assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 5, @@ -338,9 +329,7 @@ mod tests { // ----------------------------------------------------------------------- // [2, a, c ] --> [2, 2, 5, 2 ] { - assert!(prioritization_fee - .update(2, &[write_account_a, write_account_c]) - .is_ok()); + prioritization_fee.update(2, vec![write_account_a, write_account_c]); assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 2, diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 385cb8e0fe313f..e601426884ce8e 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -116,13 +116,18 @@ impl PrioritizationFeeCacheMetrics { } } +#[derive(Debug)] +struct CacheTransactionUpdate { + transaction_fee: u64, + writable_accounts: Vec, +} + #[derive(Debug)] enum CacheServiceUpdate { - TransactionUpdate { + TransactionsUpdate { slot: Slot, bank_id: BankId, - transaction_fee: u64, - writable_accounts: Vec, + updates: Vec, }, BankFinalized { slot: Slot, @@ -188,57 +193,63 @@ impl PrioritizationFeeCache { } } - /// Update with a list of non-vote transactions' compute_budget_details and account_locks; Only + /// Update with a list of non-vote transactions' compute_budget_details and account_locks; Only /// transactions have both valid compute_budget_details and account_locks will be used to update /// fee_cache asynchronously. pub fn update<'a>(&self, bank: &Bank, txs: impl Iterator) { let (_, send_updates_time) = measure!( { - for sanitized_transaction in txs { - // Vote transactions are not prioritized, therefore they are excluded from - // updating fee_cache. - if sanitized_transaction.is_simple_vote_transaction() { - continue; - } - - let round_compute_unit_price_enabled = false; // TODO: bank.feture_set.is_active(round_compute_unit_price) - let compute_budget_details = sanitized_transaction - .get_compute_budget_details(round_compute_unit_price_enabled); - let account_locks = sanitized_transaction - .get_account_locks(bank.get_transaction_account_lock_limit()); - - if compute_budget_details.is_none() || account_locks.is_err() { - continue; - } - let compute_budget_details = compute_budget_details.unwrap(); - - // filter out any transaction that requests zero compute_unit_limit - // since its priority fee amount is not instructive - if compute_budget_details.compute_unit_limit == 0 { - continue; - } - - let writable_accounts = account_locks - .unwrap() - .writable - .iter() - .map(|key| **key) - .collect::>(); - - self.sender - .send(CacheServiceUpdate::TransactionUpdate { - slot: bank.slot(), - bank_id: bank.bank_id(), + let updates = txs + .filter_map(|sanitized_transaction| { + // Vote transactions are not prioritized, therefore they are excluded from + // updating fee_cache. + if sanitized_transaction.is_simple_vote_transaction() { + return None; + } + + let round_compute_unit_price_enabled = false; // TODO: bank.feture_set.is_active(round_compute_unit_price) + let compute_budget_details = sanitized_transaction + .get_compute_budget_details(round_compute_unit_price_enabled); + let account_locks = sanitized_transaction + .get_account_locks(bank.get_transaction_account_lock_limit()); + + if compute_budget_details.is_none() || account_locks.is_err() { + return None; + } + let compute_budget_details = compute_budget_details.unwrap(); + + // filter out any transaction that requests zero compute_unit_limit + // since its priority fee amount is not instructive + if compute_budget_details.compute_unit_limit == 0 { + return None; + } + + let writable_accounts = account_locks + .unwrap() + .writable + .iter() + .map(|key| **key) + .collect::>(); + + Some(CacheTransactionUpdate { transaction_fee: compute_budget_details.compute_unit_price, writable_accounts, }) - .unwrap_or_else(|err| { - warn!( - "prioritization fee cache transaction updates failed: {:?}", - err - ); - }); - } + }) + .collect(); + + self.sender + .send(CacheServiceUpdate::TransactionsUpdate { + slot: bank.slot(), + bank_id: bank.bank_id(), + updates, + }) + .unwrap_or_else(|err| { + warn!( + "prioritization fee cache transaction updates failed: {:?}", + err + ); + }); }, "send_updates", ); @@ -265,10 +276,10 @@ impl PrioritizationFeeCache { unfinalized: &mut BTreeMap, slot: Slot, bank_id: BankId, - transaction_fee: u64, - writable_accounts: &[Pubkey], + updates: Vec, metrics: &PrioritizationFeeCacheMetrics, ) { + let transaction_update_count = updates.len() as u64; let (_, entry_update_time) = measure!( { let block_prioritization_fee = unfinalized @@ -276,12 +287,19 @@ impl PrioritizationFeeCache { .or_default() .entry(bank_id) .or_insert(PrioritizationFee::default()); - block_prioritization_fee.update(transaction_fee, writable_accounts) + + for CacheTransactionUpdate { + transaction_fee, + writable_accounts, + } in updates + { + block_prioritization_fee.update(transaction_fee, writable_accounts); + } }, "entry_update_time" ); metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us()); - metrics.accumulate_successful_transaction_update_count(1); + metrics.accumulate_successful_transaction_update_count(transaction_update_count); } fn finalize_slot( @@ -366,19 +384,11 @@ impl PrioritizationFeeCache { for update in receiver.iter() { match update { - CacheServiceUpdate::TransactionUpdate { + CacheServiceUpdate::TransactionsUpdate { slot, bank_id, - transaction_fee, - writable_accounts, - } => Self::update_cache( - &mut unfinalized, - slot, - bank_id, - transaction_fee, - &writable_accounts, - &metrics, - ), + updates, + } => Self::update_cache(&mut unfinalized, slot, bank_id, updates, &metrics), CacheServiceUpdate::BankFinalized { slot, bank_id } => { Self::finalize_slot( &mut unfinalized, @@ -388,7 +398,6 @@ impl PrioritizationFeeCache { bank_id, &metrics, ); - metrics.report(slot); } CacheServiceUpdate::Exit => { @@ -480,7 +489,7 @@ mod tests { .load(Ordering::Relaxed) != expected_update_count { - std::thread::sleep(std::time::Duration::from_millis(100)); + std::thread::sleep(std::time::Duration::from_millis(10)); } } From 7b7ee6d9b902452596f46cb346f32350551b7fd0 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Mon, 19 Feb 2024 09:55:56 -0500 Subject: [PATCH 6/7] make clippy happy --- runtime/src/prioritization_fee_cache.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index e601426884ce8e..985ad16f4ebf57 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -286,7 +286,7 @@ impl PrioritizationFeeCache { .entry(slot) .or_default() .entry(bank_id) - .or_insert(PrioritizationFee::default()); + .or_default(); for CacheTransactionUpdate { transaction_fee, @@ -319,10 +319,7 @@ impl PrioritizationFeeCache { }; } - let mut slot_prioritization_fee = match unfinalized.remove(&slot) { - Some(slot_prioritization_fee) => slot_prioritization_fee, - None => return, - }; + let Some(mut slot_prioritization_fee) = unfinalized.remove(&slot) else { return }; // prune cache by evicting write account entry from prioritization fee if its fee is less // or equal to block's minimum transaction fee, because they are irrelevant in calculating From 00811c16792b926d3ca5de37450ca8e7a4ea98b4 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Wed, 21 Feb 2024 19:58:56 -0500 Subject: [PATCH 7/7] use vec instead of hashmap --- runtime/src/prioritization_fee_cache.rs | 80 ++++++++++++------------- 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 985ad16f4ebf57..54b7810c2186d5 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -193,7 +193,7 @@ impl PrioritizationFeeCache { } } - /// Update with a list of non-vote transactions' compute_budget_details and account_locks; Only + /// Update with a list of non-vote transactions' compute_budget_details and account_locks; Only /// transactions have both valid compute_budget_details and account_locks will be used to update /// fee_cache asynchronously. pub fn update<'a>(&self, bank: &Bank, txs: impl Iterator) { @@ -409,7 +409,7 @@ impl PrioritizationFeeCache { self.cache.read().unwrap().len() } - pub fn get_prioritization_fees(&self, account_keys: &[Pubkey]) -> HashMap { + pub fn get_prioritization_fees(&self, account_keys: &[Pubkey]) -> Vec<(Slot, u64)> { self.cache .read() .unwrap() @@ -619,10 +619,6 @@ mod tests { assert_eq!(2, prioritization_fee_cache.available_block_count()); } - fn hashmap_of(vec: Vec<(Slot, u64)>) -> HashMap { - vec.into_iter().collect() - } - #[test] fn test_get_prioritization_fees() { solana_logger::setup(); @@ -694,28 +690,28 @@ mod tests { // after block is completed sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1, bank1.bank_id()); assert_eq!( - hashmap_of(vec![(1, 1)]), + vec![(1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]) ); assert_eq!( - hashmap_of(vec![(1, 1)]), + vec![(1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -737,28 +733,28 @@ mod tests { sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter()); // before block is marked as completed assert_eq!( - hashmap_of(vec![(1, 1)]), + vec![(1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]) ); assert_eq!( - hashmap_of(vec![(1, 1)]), + vec![(1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -768,28 +764,28 @@ mod tests { // after block is completed sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank2.bank_id()); assert_eq!( - hashmap_of(vec![(2, 3), (1, 1)]), + vec![(2, 3), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]), ); assert_eq!( - hashmap_of(vec![(2, 3), (1, 2)]), + vec![(2, 3), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 1)]), + vec![(2, 4), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -811,28 +807,28 @@ mod tests { sync_update(&prioritization_fee_cache, bank3.clone(), txs.iter()); // before block is marked as completed assert_eq!( - hashmap_of(vec![(2, 3), (1, 1)]), + vec![(2, 3), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]), ); assert_eq!( - hashmap_of(vec![(2, 3), (1, 2)]), + vec![(2, 3), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 1)]), + vec![(2, 4), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -842,28 +838,28 @@ mod tests { // after block is completed sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3, bank3.bank_id()); assert_eq!( - hashmap_of(vec![(3, 5), (2, 3), (1, 1)]), + vec![(3, 5), (2, 3), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]), ); assert_eq!( - hashmap_of(vec![(3, 6), (2, 3), (1, 2)]), + vec![(3, 6), (2, 3), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), ); assert_eq!( - hashmap_of(vec![(3, 5), (2, 4), (1, 2)]), + vec![(3, 5), (2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), ); assert_eq!( - hashmap_of(vec![(3, 6), (2, 4), (1, 1)]), + vec![(3, 6), (2, 4), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), ); assert_eq!( - hashmap_of(vec![(3, 6), (2, 4), (1, 2)]), + vec![(3, 6), (2, 4), (1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]), ); assert_eq!( - hashmap_of(vec![(3, 6), (2, 4), (1, 2)]), + vec![(3, 6), (2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -948,28 +944,28 @@ mod tests { // and data available for query are from bank1 assert_eq!( - hashmap_of(vec![(slot, 1)]), + vec![(slot, 1)], prioritization_fee_cache.get_prioritization_fees(&[]) ); assert_eq!( - hashmap_of(vec![(slot, 2)]), + vec![(slot, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]) ); assert_eq!( - hashmap_of(vec![(slot, 2)]), + vec![(slot, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]) ); assert_eq!( - hashmap_of(vec![(slot, 1)]), + vec![(slot, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]) ); assert_eq!( - hashmap_of(vec![(slot, 2)]), + vec![(slot, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]) ); assert_eq!( - hashmap_of(vec![(slot, 2)]), + vec![(slot, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b,