From 251054d8c9390c8744f9e229e69be4481065a30e Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 1 Apr 2020 20:42:04 -0600 Subject: [PATCH] Backport confirmations fix. (#9252) automerge --- core/src/commitment.rs | 193 ++++++++++++++++--------------- core/src/replay_stage.rs | 39 +++++-- core/src/rpc.rs | 107 +++++++++++------- core/src/rpc_pubsub.rs | 70 ++++++++++-- core/src/rpc_pubsub_service.rs | 31 +++-- core/src/rpc_subscriptions.rs | 200 ++++++++++++++++++++------------- core/src/storage_stage.rs | 61 ++++++---- core/src/tvu.rs | 8 +- core/src/validator.rs | 2 +- core/tests/client.rs | 11 +- core/tests/storage_stage.rs | 53 +++++---- docs/src/apps/jsonrpc-api.md | 5 +- runtime/src/bank/mod.rs | 20 ++-- runtime/src/bank_client.rs | 29 ++--- runtime/src/status_cache.rs | 63 ++--------- 15 files changed, 518 insertions(+), 374 deletions(-) diff --git a/core/src/commitment.rs b/core/src/commitment.rs index a7df5e328f23c2..7cefe23e248275 100644 --- a/core/src/commitment.rs +++ b/core/src/commitment.rs @@ -1,3 +1,6 @@ +use crate::consensus::VOTE_THRESHOLD_SIZE; +use solana_measure::measure::Measure; +use solana_metrics::inc_new_counter_info; use solana_runtime::bank::Bank; use solana_sdk::clock::Slot; use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY}; @@ -31,17 +34,40 @@ impl BlockCommitment { } } -#[derive(Debug, Default)] +#[derive(Default)] pub struct BlockCommitmentCache { block_commitment: HashMap, total_stake: u64, + bank: Arc, + root: Slot, +} + +impl std::fmt::Debug for BlockCommitmentCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockCommitmentCache") + .field("block_commitment", &self.block_commitment) + .field("total_stake", &self.total_stake) + .field( + "bank", + &format_args!("Bank({{current_slot: {:?}}})", self.bank.slot()), + ) + .field("root", &self.root) + .finish() + } } impl BlockCommitmentCache { - pub fn new(block_commitment: HashMap, total_stake: u64) -> Self { + pub fn new( + block_commitment: HashMap, + total_stake: u64, + bank: Arc, + root: Slot, + ) -> Self { Self { block_commitment, total_stake, + bank, + root, } } @@ -53,38 +79,62 @@ impl BlockCommitmentCache { self.total_stake } - pub fn get_block_with_depth_commitment( - &self, - minimum_depth: usize, - minimum_stake_percentage: f64, - ) -> Option { - self.block_commitment - .iter() - .filter(|&(_, block_commitment)| { - let fork_stake_minimum_depth: u64 = block_commitment.commitment[minimum_depth..] - .iter() - .cloned() - .sum(); - fork_stake_minimum_depth as f64 / self.total_stake as f64 - >= minimum_stake_percentage - }) - .map(|(slot, _)| *slot) - .max() + pub fn bank(&self) -> Arc { + self.bank.clone() } - pub fn get_rooted_block_with_commitment(&self, minimum_stake_percentage: f64) -> Option { - self.get_block_with_depth_commitment(MAX_LOCKOUT_HISTORY - 1, minimum_stake_percentage) + pub fn slot(&self) -> Slot { + self.bank.slot() + } + + pub fn root(&self) -> Slot { + self.root + } + + pub fn get_confirmation_count(&self, slot: Slot) -> Option { + self.get_lockout_count(slot, VOTE_THRESHOLD_SIZE) + } + + // Returns the lowest level at which at least `minimum_stake_percentage` of the total epoch + // stake is locked out + fn get_lockout_count(&self, slot: Slot, minimum_stake_percentage: f64) -> Option { + self.get_block_commitment(slot).map(|block_commitment| { + let iterator = block_commitment.commitment.iter().enumerate().rev(); + let mut sum = 0; + for (i, stake) in iterator { + sum += stake; + if (sum as f64 / self.total_stake as f64) > minimum_stake_percentage { + return i + 1; + } + } + 0 + }) + } + #[cfg(test)] + pub fn new_for_tests() -> Self { + let mut block_commitment: HashMap = HashMap::new(); + block_commitment.insert(0, BlockCommitment::default()); + Self { + block_commitment, + total_stake: 42, + ..Self::default() + } } } pub struct CommitmentAggregationData { bank: Arc, + root: Slot, total_staked: u64, } impl CommitmentAggregationData { - pub fn new(bank: Arc, total_staked: u64) -> Self { - Self { bank, total_staked } + pub fn new(bank: Arc, root: Slot, total_staked: u64) -> Self { + Self { + bank, + root, + total_staked, + } } } @@ -144,14 +194,24 @@ impl AggregateCommitmentService { continue; } + let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms"); let block_commitment = Self::aggregate_commitment(&ancestors, &aggregation_data.bank); - let mut new_block_commitment = - BlockCommitmentCache::new(block_commitment, aggregation_data.total_staked); + let mut new_block_commitment = BlockCommitmentCache::new( + block_commitment, + aggregation_data.total_staked, + aggregation_data.bank, + aggregation_data.root, + ); let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + aggregate_commitment_time.stop(); + inc_new_counter_info!( + "aggregate-commitment-ms", + aggregate_commitment_time.as_ms() as usize + ); } } @@ -246,84 +306,31 @@ mod tests { } #[test] - fn test_get_block_with_depth_commitment() { + fn test_get_confirmations() { + let bank = Arc::new(Bank::default()); // Build BlockCommitmentCache with votes at depths 0 and 1 for 2 slots let mut cache0 = BlockCommitment::default(); - cache0.increase_confirmation_stake(1, 15); - cache0.increase_confirmation_stake(2, 25); + cache0.increase_confirmation_stake(1, 5); + cache0.increase_confirmation_stake(2, 40); let mut cache1 = BlockCommitment::default(); - cache1.increase_confirmation_stake(1, 10); - cache1.increase_confirmation_stake(2, 20); + cache1.increase_confirmation_stake(1, 40); + cache1.increase_confirmation_stake(2, 5); - let mut block_commitment = HashMap::new(); - block_commitment.entry(0).or_insert(cache0.clone()); - block_commitment.entry(1).or_insert(cache1.clone()); - let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50); - - // Neither slot has rooted votes - assert_eq!( - block_commitment_cache.get_rooted_block_with_commitment(0.1), - None - ); - // Neither slot meets the minimum level of commitment 0.6 at depth 1 - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(1, 0.6), - None - ); - // Only slot 0 meets the minimum level of commitment 0.5 at depth 1 - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(1, 0.5), - Some(0) - ); - // If multiple slots meet the minimum level of commitment, method should return the most recent - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(1, 0.4), - Some(1) - ); - // If multiple slots meet the minimum level of commitment, method should return the most recent - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(0, 0.6), - Some(1) - ); - // Neither slot meets the minimum level of commitment 0.9 at depth 0 - assert_eq!( - block_commitment_cache.get_block_with_depth_commitment(0, 0.9), - None - ); - } - - #[test] - fn test_get_rooted_block_with_commitment() { - // Build BlockCommitmentCache with rooted votes - let mut cache0 = BlockCommitment::new([0; MAX_LOCKOUT_HISTORY]); - cache0.increase_confirmation_stake(MAX_LOCKOUT_HISTORY, 40); - cache0.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 1, 10); - let mut cache1 = BlockCommitment::new([0; MAX_LOCKOUT_HISTORY]); - cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY, 30); - cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 1, 10); - cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 2, 10); + let mut cache2 = BlockCommitment::default(); + cache2.increase_confirmation_stake(1, 20); + cache2.increase_confirmation_stake(2, 5); let mut block_commitment = HashMap::new(); block_commitment.entry(0).or_insert(cache0.clone()); block_commitment.entry(1).or_insert(cache1.clone()); - let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50); + block_commitment.entry(2).or_insert(cache2.clone()); + let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50, bank, 0); - // Only slot 0 meets the minimum level of commitment 0.66 at root - assert_eq!( - block_commitment_cache.get_rooted_block_with_commitment(0.66), - Some(0) - ); - // If multiple slots meet the minimum level of commitment, method should return the most recent - assert_eq!( - block_commitment_cache.get_rooted_block_with_commitment(0.6), - Some(1) - ); - // Neither slot meets the minimum level of commitment 0.9 at root - assert_eq!( - block_commitment_cache.get_rooted_block_with_commitment(0.9), - None - ); + assert_eq!(block_commitment_cache.get_confirmation_count(0), Some(2)); + assert_eq!(block_commitment_cache.get_confirmation_count(1), Some(1)); + assert_eq!(block_commitment_cache.get_confirmation_count(2), Some(0),); + assert_eq!(block_commitment_cache.get_confirmation_count(3), None,); } #[test] diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c0a63f2f91de6d..034b67a89b907b 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -192,7 +192,7 @@ impl ReplayStage { // Start the replay stage loop let (lockouts_sender, commitment_service) = - AggregateCommitmentService::new(&exit, block_commitment_cache); + AggregateCommitmentService::new(&exit, block_commitment_cache.clone()); #[allow(clippy::cognitive_complexity)] let t_replay = Builder::new() @@ -309,7 +309,10 @@ impl ReplayStage { let start = allocated.get(); if !is_locked_out && vote_threshold { info!("voting: {} {}", bank.slot(), fork_weight); - subscriptions.notify_subscribers(bank.slot(), &bank_forks); + subscriptions.notify_subscribers( + block_commitment_cache.read().unwrap().slot(), + &bank_forks, + ); if let Some(votable_leader) = leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank)) { @@ -647,7 +650,13 @@ impl ReplayStage { return Err(e.into()); } } - Self::update_commitment_cache(bank.clone(), total_staked, lockouts_sender); + + Self::update_commitment_cache( + bank.clone(), + bank_forks.read().unwrap().root(), + total_staked, + lockouts_sender, + ); if let Some(ref voting_keypair) = voting_keypair { let node_keypair = cluster_info.read().unwrap().keypair.clone(); @@ -675,10 +684,13 @@ impl ReplayStage { fn update_commitment_cache( bank: Arc, + root: Slot, total_staked: u64, lockouts_sender: &Sender, ) { - if let Err(e) = lockouts_sender.send(CommitmentAggregationData::new(bank, total_staked)) { + if let Err(e) = + lockouts_sender.send(CommitmentAggregationData::new(bank, root, total_staked)) + { trace!("lockouts_sender failed: {:?}", e); } } @@ -1397,7 +1409,10 @@ pub(crate) mod tests { let bank0 = Bank::new(&genesis_config); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::default())), + )); let bank_forks = BankForks::new(0, bank0); bank_forks.working_bank().freeze(); @@ -1766,7 +1781,12 @@ pub(crate) mod tests { bank_forks.write().unwrap().insert(bank1); let arc_bank1 = bank_forks.read().unwrap().get(1).unwrap().clone(); leader_vote(&arc_bank1, &leader_voting_pubkey); - ReplayStage::update_commitment_cache(arc_bank1.clone(), leader_lamports, &lockouts_sender); + ReplayStage::update_commitment_cache( + arc_bank1.clone(), + 0, + leader_lamports, + &lockouts_sender, + ); let bank2 = Bank::new_from_parent(&arc_bank1, &Pubkey::default(), arc_bank1.slot() + 1); let _res = bank2.transfer(10, &genesis_config_info.mint_keypair, &Pubkey::new_rand()); @@ -1777,7 +1797,12 @@ pub(crate) mod tests { bank_forks.write().unwrap().insert(bank2); let arc_bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); leader_vote(&arc_bank2, &leader_voting_pubkey); - ReplayStage::update_commitment_cache(arc_bank2.clone(), leader_lamports, &lockouts_sender); + ReplayStage::update_commitment_cache( + arc_bank2.clone(), + 0, + leader_lamports, + &lockouts_sender, + ); thread::sleep(Duration::from_millis(200)); let mut expected0 = BlockCommitment::default(); diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 5b91b065e78e26..d6a7130bb0e468 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -12,7 +12,7 @@ use solana_faucet::faucet::request_airdrop_transaction; use solana_ledger::{ bank_forks::BankForks, blockstore::Blockstore, rooted_slot_iterator::RootedSlotIterator, }; -use solana_runtime::{bank::Bank, status_cache::SignatureConfirmationStatus}; +use solana_runtime::bank::Bank; use solana_sdk::{ clock::{Slot, UnixTimestamp}, commitment_config::{CommitmentConfig, CommitmentLevel}, @@ -195,11 +195,9 @@ impl JsonRpcRequestProcessor { match signature { Err(e) => Err(e), Ok(sig) => { - let status = bank.get_signature_confirmation_status(&sig); + let status = bank.get_signature_status(&sig); match status { - Some(SignatureConfirmationStatus { status, .. }) => { - new_response(bank, status.is_ok()) - } + Some(status) => new_response(bank, status.is_ok()), None => new_response(bank, false), } } @@ -403,15 +401,14 @@ impl JsonRpcRequestProcessor { signature: Signature, commitment: Option, ) -> Option { - self.bank(commitment) - .get_signature_confirmation_status(&signature) + self.get_transaction_status(signature, &self.bank(commitment)) .map( - |SignatureConfirmationStatus { - confirmations, + |TransactionStatus { status, + confirmations, .. }| RpcSignatureConfirmation { - confirmations, + confirmations: confirmations.unwrap_or(MAX_LOCKOUT_HISTORY + 1), status, }, ) @@ -435,21 +432,7 @@ impl JsonRpcRequestProcessor { let bank = self.bank(commitment); for signature in signatures { - let status = bank.get_signature_confirmation_status(&signature).map( - |SignatureConfirmationStatus { - slot, - status, - confirmations, - }| TransactionStatus { - slot, - status, - confirmations: if confirmations <= MAX_LOCKOUT_HISTORY { - Some(confirmations) - } else { - None - }, - }, - ); + let status = self.get_transaction_status(signature, &bank); statuses.push(status); } Ok(Response { @@ -457,6 +440,30 @@ impl JsonRpcRequestProcessor { value: statuses, }) } + + fn get_transaction_status( + &self, + signature: Signature, + bank: &Arc, + ) -> Option { + bank.get_signature_status_slot(&signature) + .map(|(slot, status)| { + let r_block_commitment_cache = self.block_commitment_cache.read().unwrap(); + + let confirmations = if r_block_commitment_cache.root() >= slot { + None + } else { + r_block_commitment_cache + .get_confirmation_count(slot) + .or(Some(0)) + }; + TransactionStatus { + slot, + status, + confirmations, + } + }) + } } fn get_tpu_addr(cluster_info: &Arc>) -> Result { @@ -1295,18 +1302,6 @@ pub mod tests { ) -> RpcHandler { let (bank_forks, alice, leader_vote_keypair) = new_bank_forks(); let bank = bank_forks.read().unwrap().working_bank(); - - let commitment_slot0 = BlockCommitment::new([8; MAX_LOCKOUT_HISTORY]); - let commitment_slot1 = BlockCommitment::new([9; MAX_LOCKOUT_HISTORY]); - let mut block_commitment: HashMap = HashMap::new(); - block_commitment - .entry(0) - .or_insert(commitment_slot0.clone()); - block_commitment - .entry(1) - .or_insert(commitment_slot1.clone()); - let block_commitment_cache = - Arc::new(RwLock::new(BlockCommitmentCache::new(block_commitment, 42))); let ledger_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&ledger_path).unwrap(); let blockstore = Arc::new(blockstore); @@ -1322,6 +1317,24 @@ pub mod tests { blockstore.clone(), ); + let mut commitment_slot0 = BlockCommitment::default(); + commitment_slot0.increase_confirmation_stake(2, 9); + let mut commitment_slot1 = BlockCommitment::default(); + commitment_slot1.increase_confirmation_stake(1, 9); + let mut block_commitment: HashMap = HashMap::new(); + block_commitment + .entry(0) + .or_insert(commitment_slot0.clone()); + block_commitment + .entry(1) + .or_insert(commitment_slot1.clone()); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new( + block_commitment, + 10, + bank.clone(), + 0, + ))); + // Add timestamp vote to blockstore let vote = Vote { slots: vec![1], @@ -1913,7 +1926,9 @@ pub mod tests { let result: Option = serde_json::from_value(json["result"]["value"][0].clone()) .expect("actual response deserialization"); - assert_eq!(expected_res, result.as_ref().unwrap().status); + let result = result.as_ref().unwrap(); + assert_eq!(expected_res, result.status); + assert_eq!(None, result.confirmations); // Test getSignatureStatus request on unprocessed tx let tx = system_transaction::transfer(&alice, &bob_pubkey, 10, blockhash); @@ -2262,6 +2277,8 @@ pub mod tests { fn test_rpc_processor_get_block_commitment() { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(&exit); + let bank_forks = new_bank_forks().0; + let commitment_slot0 = BlockCommitment::new([8; MAX_LOCKOUT_HISTORY]); let commitment_slot1 = BlockCommitment::new([9; MAX_LOCKOUT_HISTORY]); let mut block_commitment: HashMap = HashMap::new(); @@ -2271,8 +2288,12 @@ pub mod tests { block_commitment .entry(1) .or_insert(commitment_slot1.clone()); - let block_commitment_cache = - Arc::new(RwLock::new(BlockCommitmentCache::new(block_commitment, 42))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new( + block_commitment, + 42, + bank_forks.read().unwrap().working_bank(), + 0, + ))); let ledger_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&ledger_path).unwrap(); @@ -2280,7 +2301,7 @@ pub mod tests { config.enable_validator_exit = true; let request_processor = JsonRpcRequestProcessor::new( config, - new_bank_forks().0, + bank_forks, block_commitment_cache, Arc::new(blockstore), StorageState::default(), @@ -2344,7 +2365,7 @@ pub mod tests { .get_block_commitment(0) .map(|block_commitment| block_commitment.commitment) ); - assert_eq!(total_stake, 42); + assert_eq!(total_stake, 10); let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getBlockCommitment","params":[2]}}"#); @@ -2362,7 +2383,7 @@ pub mod tests { panic!("Expected single response"); }; assert_eq!(commitment_response.commitment, None); - assert_eq!(commitment_response.total_stake, 42); + assert_eq!(commitment_response.total_stake, 10); } #[test] diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 6421ad4482ac22..cb6911d651c6c2 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -312,8 +312,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl { #[cfg(test)] mod tests { use super::*; - use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use crate::rpc_subscriptions::tests::robust_poll_or_panic; + use crate::{ + commitment::{BlockCommitment, BlockCommitmentCache}, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + rpc_subscriptions::tests::robust_poll_or_panic, + }; use jsonrpc_core::{futures::sync::mpsc, Response}; use jsonrpc_pubsub::{PubSubHandler, Session}; use solana_budget_program::{self, budget_instruction}; @@ -325,7 +328,12 @@ mod tests { system_program, system_transaction, transaction::{self, Transaction}, }; - use std::{sync::RwLock, thread::sleep, time::Duration}; + use std::{ + collections::HashMap, + sync::{atomic::AtomicBool, RwLock}, + thread::sleep, + time::Duration, + }; fn process_transaction_and_notify( bank_forks: &Arc>, @@ -358,8 +366,13 @@ mod tests { let bank = Bank::new(&genesis_config); let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); - - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl { + subscriptions: Arc::new(RpcSubscriptions::new( + &Arc::new(AtomicBool::new(false)), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + )), + ..RpcSolPubSubImpl::default() + }; // Test signature subscriptions let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash); @@ -457,7 +470,13 @@ mod tests { let blockhash = bank.last_blockhash(); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); - let rpc = RpcSolPubSubImpl::default(); + let rpc = RpcSolPubSubImpl { + subscriptions: Arc::new(RpcSubscriptions::new( + &Arc::new(AtomicBool::new(false)), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + )), + ..RpcSolPubSubImpl::default() + }; let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe( @@ -591,7 +610,13 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bob = Keypair::new(); - let rpc = RpcSolPubSubImpl::default(); + let mut rpc = RpcSolPubSubImpl::default(); + let exit = Arc::new(AtomicBool::new(false)); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); + rpc.subscriptions = Arc::new(subscriptions); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); @@ -622,7 +647,12 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let bob = Keypair::new(); - let rpc = RpcSolPubSubImpl::default(); + let mut rpc = RpcSolPubSubImpl::default(); + let exit = Arc::new(AtomicBool::new(false)); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())); + + let subscriptions = RpcSubscriptions::new(&exit, block_commitment_cache.clone()); + rpc.subscriptions = Arc::new(subscriptions); let session = create_session(); let (subscriber, _id_receiver, receiver) = Subscriber::new_test("accountNotification"); rpc.account_subscribe(session, subscriber, bob.pubkey().to_string(), Some(2)); @@ -640,10 +670,32 @@ mod tests { let bank0 = bank_forks.read().unwrap()[0].clone(); let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); bank_forks.write().unwrap().insert(bank1); - rpc.subscriptions.notify_subscribers(1, &bank_forks); let bank1 = bank_forks.read().unwrap()[1].clone(); + + let mut cache0 = BlockCommitment::default(); + cache0.increase_confirmation_stake(1, 10); + let mut block_commitment = HashMap::new(); + block_commitment.entry(0).or_insert(cache0.clone()); + let mut new_block_commitment = + BlockCommitmentCache::new(block_commitment, 10, bank1.clone(), 0); + let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); + std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + drop(w_block_commitment_cache); + + rpc.subscriptions.notify_subscribers(1, &bank_forks); let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); bank_forks.write().unwrap().insert(bank2); + let bank2 = bank_forks.read().unwrap()[2].clone(); + + let mut cache0 = BlockCommitment::default(); + cache0.increase_confirmation_stake(2, 10); + let mut block_commitment = HashMap::new(); + block_commitment.entry(0).or_insert(cache0.clone()); + let mut new_block_commitment = BlockCommitmentCache::new(block_commitment, 10, bank2, 0); + let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); + std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + drop(w_block_commitment_cache); + rpc.subscriptions.notify_subscribers(2, &bank_forks); let expected = json!({ "jsonrpc": "2.0", diff --git a/core/src/rpc_pubsub_service.rs b/core/src/rpc_pubsub_service.rs index 98b80ee93bab0e..e58bdc775ce7d7 100644 --- a/core/src/rpc_pubsub_service.rs +++ b/core/src/rpc_pubsub_service.rs @@ -1,14 +1,20 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}; -use crate::rpc_subscriptions::RpcSubscriptions; +use crate::{ + rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}, + rpc_subscriptions::RpcSubscriptions, +}; use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_ws_server::{RequestContext, ServerBuilder}; -use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::thread::{self, sleep, Builder, JoinHandle}; -use std::time::Duration; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::Duration, +}; pub struct PubSubService { thread_hdl: JoinHandle<()>, @@ -66,13 +72,20 @@ impl PubSubService { #[cfg(test)] mod tests { use super::*; - use std::net::{IpAddr, Ipv4Addr}; + use crate::commitment::BlockCommitmentCache; + use std::{ + net::{IpAddr, Ipv4Addr}, + sync::RwLock, + }; #[test] fn test_pubsub_new() { let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + )); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 8d02bde22e686a..7ad24a9748de04 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -1,5 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request +use crate::commitment::BlockCommitmentCache; use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::{ @@ -14,11 +15,14 @@ use solana_sdk::{ account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction, }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, +}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; use std::{ + cmp::min, collections::{HashMap, HashSet}, iter, sync::{Arc, Mutex, RwLock}, @@ -80,11 +84,7 @@ fn add_subscription( { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let confirmations = confirmations.unwrap_or(0); - let confirmations = if confirmations > MAX_LOCKOUT_HISTORY { - MAX_LOCKOUT_HISTORY - } else { - confirmations - }; + let confirmations = min(confirmations, MAX_LOCKOUT_HISTORY + 1); if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) { current_hashmap.insert(sub_id, (sink, confirmations)); return; @@ -120,8 +120,8 @@ where fn check_confirmations_and_notify( subscriptions: &HashMap>, Confirmations)>>, hashmap_key: &K, - current_slot: Slot, bank_forks: &Arc>, + block_commitment_cache: &Arc>, bank_method: B, filter_results: F, notifier: &RpcNotifier, @@ -133,6 +133,10 @@ where F: Fn(X, u64) -> Box>, X: Clone + Serialize, { + let mut confirmation_slots: HashMap = HashMap::new(); + let r_block_commitment_cache = block_commitment_cache.read().unwrap(); + let current_slot = r_block_commitment_cache.slot(); + let root = r_block_commitment_cache.root(); let current_ancestors = bank_forks .read() .unwrap() @@ -140,27 +144,29 @@ where .unwrap() .ancestors .clone(); + for (slot, _) in current_ancestors.iter() { + if let Some(confirmations) = r_block_commitment_cache.get_confirmation_count(*slot) { + confirmation_slots.entry(confirmations).or_insert(*slot); + } + } + drop(r_block_commitment_cache); let mut notified_set: HashSet = HashSet::new(); if let Some(hashmap) = subscriptions.get(hashmap_key) { for (sub_id, (sink, confirmations)) in hashmap.iter() { - let desired_slot: Vec = current_ancestors - .iter() - .filter(|(_, &v)| v == *confirmations) - .map(|(k, _)| k) - .cloned() - .collect(); - let root: Vec = current_ancestors - .iter() - .filter(|(_, &v)| v == 32) - .map(|(k, _)| k) - .cloned() - .collect(); - let root = if root.len() == 1 { root[0] } else { 0 }; - if desired_slot.len() == 1 { - let slot = desired_slot[0]; - let desired_bank = bank_forks.read().unwrap().get(slot).unwrap().clone(); - let results = bank_method(&desired_bank, hashmap_key); + let desired_slot = if *confirmations == 0 { + Some(¤t_slot) + } else if *confirmations == MAX_LOCKOUT_HISTORY + 1 { + Some(&root) + } else { + confirmation_slots.get(confirmations) + }; + if let Some(&slot) = desired_slot { + let results = { + let bank_forks = bank_forks.read().unwrap(); + let desired_bank = bank_forks.get(slot).unwrap(); + bank_method(&desired_bank, hashmap_key) + }; for result in filter_results(results, root) { notifier.notify( Response { @@ -236,7 +242,10 @@ pub struct RpcSubscriptions { impl Default for RpcSubscriptions { fn default() -> Self { - Self::new(&Arc::new(AtomicBool::new(false))) + Self::new( + &Arc::new(AtomicBool::new(false)), + Arc::new(RwLock::new(BlockCommitmentCache::default())), + ) } } @@ -249,7 +258,10 @@ impl Drop for RpcSubscriptions { } impl RpcSubscriptions { - pub fn new(exit: &Arc) -> Self { + pub fn new( + exit: &Arc, + block_commitment_cache: Arc>, + ) -> Self { let (notification_sender, notification_receiver): ( Sender, Receiver, @@ -288,6 +300,7 @@ impl RpcSubscriptions { signature_subscriptions_clone, slot_subscriptions_clone, root_subscriptions_clone, + block_commitment_cache, ); }) .unwrap(); @@ -307,8 +320,8 @@ impl RpcSubscriptions { fn check_account( pubkey: &Pubkey, - current_slot: Slot, bank_forks: &Arc>, + block_commitment_cache: &Arc>, account_subscriptions: Arc, notifier: &RpcNotifier, ) { @@ -316,8 +329,8 @@ impl RpcSubscriptions { check_confirmations_and_notify( &subscriptions, pubkey, - current_slot, bank_forks, + block_commitment_cache, Bank::get_account_modified_since_parent, filter_account_result, notifier, @@ -326,8 +339,8 @@ impl RpcSubscriptions { fn check_program( program_id: &Pubkey, - current_slot: Slot, bank_forks: &Arc>, + block_commitment_cache: &Arc>, program_subscriptions: Arc, notifier: &RpcNotifier, ) { @@ -335,8 +348,8 @@ impl RpcSubscriptions { check_confirmations_and_notify( &subscriptions, program_id, - current_slot, bank_forks, + block_commitment_cache, Bank::get_program_accounts_modified_since_parent, filter_program_results, notifier, @@ -345,8 +358,8 @@ impl RpcSubscriptions { fn check_signature( signature: &Signature, - current_slot: Slot, bank_forks: &Arc>, + block_commitment_cache: &Arc>, signature_subscriptions: Arc, notifier: &RpcNotifier, ) { @@ -354,8 +367,8 @@ impl RpcSubscriptions { let notified_ids = check_confirmations_and_notify( &subscriptions, signature, - current_slot, bank_forks, + block_commitment_cache, Bank::get_signature_status_processed_since_parent, filter_signature_result, notifier, @@ -499,6 +512,7 @@ impl RpcSubscriptions { signature_subscriptions: Arc, slot_subscriptions: Arc, root_subscriptions: Arc, + block_commitment_cache: Arc>, ) { loop { if exit.load(Ordering::Relaxed) { @@ -518,7 +532,7 @@ impl RpcSubscriptions { notifier.notify(root, sink); } } - NotificationEntry::Bank((current_slot, bank_forks)) => { + NotificationEntry::Bank((_current_slot, bank_forks)) => { let pubkeys: Vec<_> = { let subs = account_subscriptions.read().unwrap(); subs.keys().cloned().collect() @@ -526,8 +540,8 @@ impl RpcSubscriptions { for pubkey in &pubkeys { Self::check_account( pubkey, - current_slot, &bank_forks, + &block_commitment_cache, account_subscriptions.clone(), ¬ifier, ); @@ -540,8 +554,8 @@ impl RpcSubscriptions { for program_id in &programs { Self::check_program( program_id, - current_slot, &bank_forks, + &block_commitment_cache, program_subscriptions.clone(), ¬ifier, ); @@ -554,8 +568,8 @@ impl RpcSubscriptions { for signature in &signatures { Self::check_signature( signature, - current_slot, &bank_forks, + &block_commitment_cache, signature_subscriptions.clone(), ¬ifier, ); @@ -596,7 +610,10 @@ impl RpcSubscriptions { #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use crate::{ + commitment::BlockCommitment, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }; use jsonrpc_core::futures::{self, stream::Stream}; use jsonrpc_pubsub::typed::Subscriber; use solana_budget_program; @@ -663,7 +680,10 @@ pub(crate) mod tests { Subscriber::new_test("accountNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); subscriptions.add_account_subscription(alice.pubkey(), None, sub_id.clone(), subscriber); assert!(subscriptions @@ -732,7 +752,10 @@ pub(crate) mod tests { Subscriber::new_test("programNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); subscriptions.add_program_subscription( solana_budget_program::id(), None, @@ -812,27 +835,41 @@ pub(crate) mod tests { .unwrap() .process_transaction(&processed_tx) .unwrap(); + let bank1 = bank_forks[1].clone(); let bank_forks = Arc::new(RwLock::new(bank_forks)); + let mut cache0 = BlockCommitment::default(); + cache0.increase_confirmation_stake(1, 10); + let cache1 = BlockCommitment::default(); + + let mut block_commitment = HashMap::new(); + block_commitment.entry(0).or_insert(cache0.clone()); + block_commitment.entry(1).or_insert(cache1.clone()); + let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 10, bank1, 0); + let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = + RpcSubscriptions::new(&exit, Arc::new(RwLock::new(block_commitment_cache))); - let (past_bank_sub, _id_receiver, past_bank_recv) = + let (past_bank_sub1, _id_receiver, past_bank_recv1) = + Subscriber::new_test("signatureNotification"); + let (past_bank_sub2, _id_receiver, past_bank_recv2) = Subscriber::new_test("signatureNotification"); let (processed_sub, _id_receiver, processed_recv) = Subscriber::new_test("signatureNotification"); + subscriptions.add_signature_subscription( past_bank_tx.signatures[0], Some(0), SubscriptionId::Number(1 as u64), - Subscriber::new_test("signatureNotification").0, + past_bank_sub1, ); subscriptions.add_signature_subscription( past_bank_tx.signatures[0], Some(1), SubscriptionId::Number(2 as u64), - past_bank_sub, + past_bank_sub2, ); subscriptions.add_signature_subscription( processed_tx.signatures[0], @@ -857,41 +894,46 @@ pub(crate) mod tests { subscriptions.notify_subscribers(1, &bank_forks); let expected_res: Option> = Some(Ok(())); - let expected = json!({ - "jsonrpc": "2.0", - "method": "signatureNotification", - "params": { - "result": { - "context": { "slot": 0 }, - "value": expected_res, - }, - "subscription": 2, - } - }); - let (response, _) = robust_poll_or_panic(past_bank_recv); - assert_eq!(serde_json::to_string(&expected).unwrap(), response); + struct Notification { + slot: Slot, + id: u64, + } - let expected = json!({ - "jsonrpc": "2.0", - "method": "signatureNotification", - "params": { - "result": { - "context": { "slot": 1 }, - "value": expected_res, - }, - "subscription": 3, - } - }); - let (response, _) = robust_poll_or_panic(processed_recv); - assert_eq!(serde_json::to_string(&expected).unwrap(), response); + let expected_notification = |exp: Notification| -> String { + let json = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": exp.slot }, + "value": &expected_res, + }, + "subscription": exp.id, + } + }); + serde_json::to_string(&json).unwrap() + }; - let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); + // Expect to receive a notification from bank 1 because this subscription is + // looking for 0 confirmations and so checks the current bank + let expected = expected_notification(Notification { slot: 1, id: 1 }); + let (response, _) = robust_poll_or_panic(past_bank_recv1); + assert_eq!(expected, response); + + // Expect to receive a notification from bank 0 because this subscription is + // looking for 1 confirmation and so checks the past bank + let expected = expected_notification(Notification { slot: 0, id: 2 }); + let (response, _) = robust_poll_or_panic(past_bank_recv2); + assert_eq!(expected, response); + + let expected = expected_notification(Notification { slot: 1, id: 3 }); + let (response, _) = robust_poll_or_panic(processed_recv); + assert_eq!(expected, response); // Subscription should be automatically removed after notification + let sig_subs = subscriptions.signature_subscriptions.read().unwrap(); assert!(!sig_subs.contains_key(&processed_tx.signatures[0])); - - // Only one notification is expected for signature processed in previous bank - assert_eq!(sig_subs.get(&past_bank_tx.signatures[0]).unwrap().len(), 1); + assert!(!sig_subs.contains_key(&past_bank_tx.signatures[0])); // Unprocessed signature subscription should not be removed assert_eq!( @@ -906,7 +948,10 @@ pub(crate) mod tests { Subscriber::new_test("slotNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); subscriptions.add_slot_subscription(sub_id.clone(), subscriber); assert!(subscriptions @@ -944,7 +989,10 @@ pub(crate) mod tests { Subscriber::new_test("rootNotification"); let sub_id = SubscriptionId::Number(0 as u64); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = RpcSubscriptions::new(&exit); + let subscriptions = RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + ); subscriptions.add_root_subscription(sub_id.clone(), subscriber); assert!(subscriptions diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 7f26738a90f47d..556bc7a8fb68af 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -4,6 +4,7 @@ use crate::{ cluster_info::ClusterInfo, + commitment::BlockCommitmentCache, contact_info::ContactInfo, result::{Error, Result}, }; @@ -11,9 +12,7 @@ use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; use solana_chacha_cuda::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore}; -use solana_runtime::{ - bank::Bank, status_cache::SignatureConfirmationStatus, storage_utils::archiver_accounts, -}; +use solana_runtime::{bank::Bank, storage_utils::archiver_accounts}; use solana_sdk::{ account::Account, account_utils::StateMut, @@ -30,6 +29,7 @@ use solana_storage_program::{ storage_instruction, storage_instruction::proof_validation, }; +use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ cmp, collections::HashMap, @@ -185,6 +185,7 @@ impl StorageStage { exit: &Arc, bank_forks: &Arc>, cluster_info: &Arc>, + block_commitment_cache: Arc>, ) -> Self { let (instruction_sender, instruction_receiver) = channel(); @@ -256,6 +257,7 @@ impl StorageStage { &keypair, &storage_keypair, &transactions_socket, + &block_commitment_cache, ) .unwrap_or_else(|err| { info!("failed to send storage transaction: {:?}", err) @@ -289,6 +291,7 @@ impl StorageStage { keypair: &Arc, storage_keypair: &Arc, transactions_socket: &UdpSocket, + block_commitment_cache: &Arc>, ) -> io::Result<()> { let working_bank = bank_forks.read().unwrap().working_bank(); let blockhash = working_bank.confirmed_last_blockhash().0; @@ -323,8 +326,13 @@ impl StorageStage { cluster_info.read().unwrap().my_data().tpu, )?; sleep(Duration::from_millis(100)); - if Self::poll_for_signature_confirmation(bank_forks, &transaction.signatures[0], 0) - .is_ok() + if Self::poll_for_signature_confirmation( + bank_forks, + block_commitment_cache, + &transaction.signatures[0], + 0, + ) + .is_ok() { break; }; @@ -334,23 +342,24 @@ impl StorageStage { fn poll_for_signature_confirmation( bank_forks: &Arc>, + block_commitment_cache: &Arc>, signature: &Signature, min_confirmed_blocks: usize, ) -> Result<()> { let mut now = Instant::now(); let mut confirmed_blocks = 0; loop { - let response = bank_forks - .read() - .unwrap() - .working_bank() - .get_signature_confirmation_status(signature); - if let Some(SignatureConfirmationStatus { - confirmations, - status, - .. - }) = response - { + let working_bank = bank_forks.read().unwrap().working_bank(); + let response = working_bank.get_signature_status_slot(signature); + if let Some((slot, status)) = response { + let confirmations = if working_bank.src.roots().contains(&slot) { + MAX_LOCKOUT_HISTORY + 1 + } else { + let r_block_commitment_cache = block_commitment_cache.read().unwrap(); + r_block_commitment_cache + .get_confirmation_count(slot) + .unwrap_or(0) + }; if status.is_ok() { if confirmed_blocks != confirmations { now = Instant::now(); @@ -655,12 +664,18 @@ mod tests { use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use rayon::prelude::*; use solana_runtime::bank::Bank; - use solana_sdk::hash::Hasher; - use solana_sdk::signature::{Keypair, Signer}; - use std::cmp::{max, min}; - use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; + use solana_sdk::{ + hash::Hasher, + signature::{Keypair, Signer}, + }; + use std::{ + cmp::{max, min}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + mpsc::channel, + Arc, RwLock, + }, + }; #[test] fn test_storage_stage_none_ledger() { @@ -675,6 +690,7 @@ mod tests { &[bank.clone()], vec![0], ))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let (_slot_sender, slot_receiver) = channel(); let storage_state = StorageState::new( &bank.last_blockhash(), @@ -690,6 +706,7 @@ mod tests { &exit.clone(), &bank_forks, &cluster_info, + block_commitment_cache, ); exit.store(true, Ordering::Relaxed); storage_stage.join().unwrap(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d49531a13b03a0..aa0bc5eff07f36 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -182,7 +182,7 @@ impl Tvu { slot_full_senders: vec![blockstream_slot_sender], latest_root_senders: vec![ledger_cleanup_slot_sender], accounts_hash_sender: Some(accounts_hash_sender), - block_commitment_cache, + block_commitment_cache: block_commitment_cache.clone(), transaction_status_sender, rewards_recorder_sender, }; @@ -226,6 +226,7 @@ impl Tvu { &exit, &bank_forks, &cluster_info, + block_commitment_cache, ); Tvu { @@ -314,7 +315,10 @@ pub mod tests { &StorageState::default(), None, l_receiver, - &Arc::new(RpcSubscriptions::new(&exit)), + &Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::default())), + )), &poh_recorder, &leader_schedule_cache, &exit, diff --git a/core/src/validator.rs b/core/src/validator.rs index 8138dc738d017f..98959f3b73d1c3 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -239,7 +239,7 @@ impl Validator { let blockstore = Arc::new(blockstore); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let subscriptions = Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())); let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { if ContactInfo::is_valid_address(&node.info.rpc) { diff --git a/core/tests/client.rs b/core/tests/client.rs index be06e8421754b3..bb9c456af788b2 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -3,8 +3,8 @@ use solana_client::{ rpc_client::RpcClient, }; use solana_core::{ - rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, - validator::TestValidator, + commitment::BlockCommitmentCache, rpc_pubsub_service::PubSubService, + rpc_subscriptions::RpcSubscriptions, validator::TestValidator, }; use solana_sdk::{ commitment_config::CommitmentConfig, pubkey::Pubkey, rpc_port, signature::Signer, @@ -15,7 +15,7 @@ use std::{ net::{IpAddr, SocketAddr}, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, RwLock, }, thread::sleep, time::{Duration, Instant}, @@ -85,7 +85,10 @@ fn test_slot_subscription() { rpc_port::DEFAULT_RPC_PUBSUB_PORT, ); let exit = Arc::new(AtomicBool::new(false)); - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + Arc::new(RwLock::new(BlockCommitmentCache::default())), + )); let pubsub_service = PubSubService::new(&subscriptions, pubsub_addr, &exit); std::thread::sleep(Duration::from_millis(400)); diff --git a/core/tests/storage_stage.rs b/core/tests/storage_stage.rs index 708734641a4f4d..bc2354b84bf56e 100644 --- a/core/tests/storage_stage.rs +++ b/core/tests/storage_stage.rs @@ -3,28 +3,35 @@ #[cfg(test)] mod tests { use log::*; - use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_core::storage_stage::{test_cluster_info, SLOTS_PER_TURN_TEST}; - use solana_core::storage_stage::{StorageStage, StorageState}; - use solana_ledger::bank_forks::BankForks; - use solana_ledger::blockstore_processor; - use solana_ledger::entry; - use solana_ledger::{blockstore::Blockstore, create_new_tmp_ledger}; + use solana_core::{ + commitment::BlockCommitmentCache, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + storage_stage::{test_cluster_info, StorageStage, StorageState, SLOTS_PER_TURN_TEST}, + }; + use solana_ledger::{ + bank_forks::BankForks, blockstore::Blockstore, blockstore_processor, create_new_tmp_ledger, + entry, + }; use solana_runtime::bank::Bank; - use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; - use solana_sdk::hash::Hash; - use solana_sdk::message::Message; - use solana_sdk::pubkey::Pubkey; - use solana_sdk::signature::{Keypair, Signer}; - use solana_sdk::transaction::Transaction; - use solana_storage_program::storage_instruction; - use solana_storage_program::storage_instruction::StorageAccountType; - use std::fs::remove_dir_all; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; + use solana_sdk::{ + clock::DEFAULT_TICKS_PER_SLOT, + hash::Hash, + message::Message, + pubkey::Pubkey, + signature::{Keypair, Signer}, + transaction::Transaction, + }; + use solana_storage_program::storage_instruction::{self, StorageAccountType}; + use std::{ + fs::remove_dir_all, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + Arc, RwLock, + }, + thread::sleep, + time::Duration, + }; #[test] fn test_storage_stage_process_account_proofs() { @@ -52,6 +59,7 @@ mod tests { &[bank.clone()], vec![0], ))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let cluster_info = test_cluster_info(&keypair.pubkey()); let (bank_sender, bank_receiver) = channel(); @@ -69,6 +77,7 @@ mod tests { &exit.clone(), &bank_forks, &cluster_info, + block_commitment_cache, ); bank_sender.send(vec![bank.clone()]).unwrap(); @@ -171,6 +180,7 @@ mod tests { &[bank.clone()], vec![0], ))); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let cluster_info = test_cluster_info(&keypair.pubkey()); let (bank_sender, bank_receiver) = channel(); @@ -188,6 +198,7 @@ mod tests { &exit.clone(), &bank_forks, &cluster_info, + block_commitment_cache, ); bank_sender.send(vec![bank.clone()]).unwrap(); diff --git a/docs/src/apps/jsonrpc-api.md b/docs/src/apps/jsonrpc-api.md index faf1bbffce7583..23531a39f18b19 100644 --- a/docs/src/apps/jsonrpc-api.md +++ b/docs/src/apps/jsonrpc-api.md @@ -119,7 +119,7 @@ Many methods that take a commitment parameter return an RpcResponse JSON object ### confirmTransaction -Returns a transaction receipt +Returns a transaction receipt. This method only searches the recent status cache of signatures, which retains all active slots plus `MAX_RECENT_BLOCKHASHES` rooted slots. #### Parameters: @@ -657,7 +657,7 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m ### getSignatureStatus -Returns the status of a given signature. This method is similar to [confirmTransaction](jsonrpc-api.md#confirmtransaction) but provides more resolution for error events. +Returns the status of a given signature. This method is similar to [confirmTransaction](jsonrpc-api.md#confirmtransaction) but provides more resolution for error events. This method only searches the recent status cache of signatures, which retains all active slots plus `MAX_RECENT_BLOCKHASHES` rooted slots. #### Parameters: @@ -690,7 +690,6 @@ Returns the statuses of a list of signatures. This method is similar to [confirm * `` - An array of transaction signatures to confirm, as base-58 encoded strings * `` - (optional) Extended Rpc configuration, containing the following optional fields: * `commitment: ` - [Commitment](jsonrpc-api.md#configuring-state-commitment) - * `searchTransactionHistory: ` - whether to search the ledger transaction status cache, which may be expensive #### Results: diff --git a/runtime/src/bank/mod.rs b/runtime/src/bank/mod.rs index 36af55ace36493..d426dec397e725 100644 --- a/runtime/src/bank/mod.rs +++ b/runtime/src/bank/mod.rs @@ -14,7 +14,7 @@ use crate::{ deserialize_atomicbool, deserialize_atomicu64, serialize_atomicbool, serialize_atomicu64, }, stakes::Stakes, - status_cache::{SignatureConfirmationStatus, SlotDelta, StatusCache}, + status_cache::{SlotDelta, StatusCache}, storage_utils, storage_utils::StorageAccounts, system_instruction_processor::{get_system_account_kind, SystemAccountKind}, @@ -1841,29 +1841,25 @@ impl Bank { &self, signature: &Signature, ) -> Option> { - if let Some(status) = self.get_signature_confirmation_status(signature) { - if status.slot == self.slot() { - return Some(status.status); + if let Some((slot, status)) = self.get_signature_status_slot(signature) { + if slot <= self.slot() { + return Some(status); } } None } - pub fn get_signature_confirmation_status( - &self, - signature: &Signature, - ) -> Option>> { + pub fn get_signature_status_slot(&self, signature: &Signature) -> Option<(Slot, Result<()>)> { let rcache = self.src.status_cache.read().unwrap(); - rcache.get_signature_status_slow(signature, &self.ancestors) + rcache.get_signature_slot(signature, &self.ancestors) } pub fn get_signature_status(&self, signature: &Signature) -> Option> { - self.get_signature_confirmation_status(signature) - .map(|v| v.status) + self.get_signature_status_slot(signature).map(|v| v.1) } pub fn has_signature(&self, signature: &Signature) -> bool { - self.get_signature_confirmation_status(signature).is_some() + self.get_signature_status_slot(signature).is_some() } /// Hash the `accounts` HashMap. This represents a validator's interpretation diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index 9346359c151704..fa9731c2e41e28 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -1,4 +1,4 @@ -use crate::{bank::Bank, status_cache::SignatureConfirmationStatus}; +use crate::bank::Bank; use solana_sdk::{ account::Account, client::{AsyncClient, Client, SyncClient}, @@ -184,26 +184,15 @@ impl SyncClient for BankClient { signature: &Signature, min_confirmed_blocks: usize, ) -> Result { - let mut now = Instant::now(); - let mut confirmed_blocks = 0; + // https://github.com/solana-labs/solana/issues/7199 + assert_eq!(min_confirmed_blocks, 1, "BankClient cannot observe the passage of multiple blocks, so min_confirmed_blocks must be 1"); + let now = Instant::now(); + let confirmed_blocks; loop { - let response = self.bank.get_signature_confirmation_status(signature); - if let Some(SignatureConfirmationStatus { - confirmations, - status, - .. - }) = response - { - if status.is_ok() { - if confirmed_blocks != confirmations { - now = Instant::now(); - confirmed_blocks = confirmations; - } - if confirmations >= min_confirmed_blocks { - break; - } - } - }; + if self.bank.get_signature_status(signature).is_some() { + confirmed_blocks = 1; + break; + } if now.elapsed().as_secs() > 15 { return Err(TransportError::IoError(io::Error::new( io::ErrorKind::Other, diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index d7c0961209bcf8..b6ae088965ec3e 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -103,29 +103,20 @@ impl StatusCache { None } - pub fn get_signature_status_slow( + pub fn get_signature_slot( &self, - sig: &Signature, + signature: &Signature, ancestors: &HashMap, - ) -> Option> { - trace!("get_signature_status_slow"); + ) -> Option<(Slot, T)> { let mut keys = vec![]; let mut val: Vec<_> = self.cache.iter().map(|(k, _)| *k).collect(); keys.append(&mut val); for blockhash in keys.iter() { - trace!("get_signature_status_slow: trying {}", blockhash); - if let Some((forkid, res)) = self.get_signature_status(sig, blockhash, ancestors) { - trace!("get_signature_status_slow: got {}", forkid); - let confirmations = ancestors - .get(&forkid) - .copied() - .unwrap_or_else(|| ancestors.len()); - return Some(SignatureConfirmationStatus { - slot: forkid, - confirmations, - status: res, - }); + trace!("get_signature_slot: trying {}", blockhash); + let status = self.get_signature_status(signature, blockhash, ancestors); + if status.is_some() { + return status; } } None @@ -265,10 +256,7 @@ mod tests { status_cache.get_signature_status(&sig, &blockhash, &HashMap::new()), None ); - assert_eq!( - status_cache.get_signature_status_slow(&sig, &HashMap::new()), - None - ); + assert_eq!(status_cache.get_signature_slot(&sig, &HashMap::new()), None); } #[test] @@ -283,12 +271,8 @@ mod tests { Some((0, ())) ); assert_eq!( - status_cache.get_signature_status_slow(&sig, &ancestors), - Some(SignatureConfirmationStatus { - slot: 0, - confirmations: 1, - status: () - }) + status_cache.get_signature_slot(&sig, &ancestors), + Some((0, ())) ); } @@ -303,10 +287,7 @@ mod tests { status_cache.get_signature_status(&sig, &blockhash, &ancestors), None ); - assert_eq!( - status_cache.get_signature_status_slow(&sig, &ancestors), - None - ); + assert_eq!(status_cache.get_signature_slot(&sig, &ancestors), None); } #[test] @@ -323,24 +304,6 @@ mod tests { ); } - #[test] - fn test_find_sig_with_root_ancestor_fork_max_len() { - let sig = Signature::default(); - let mut status_cache = BankStatusCache::default(); - let blockhash = hash(Hash::default().as_ref()); - let ancestors = vec![(2, 2)].into_iter().collect(); - status_cache.insert(&blockhash, &sig, 0, ()); - status_cache.add_root(0); - assert_eq!( - status_cache.get_signature_status_slow(&sig, &ancestors), - Some(SignatureConfirmationStatus { - slot: 0, - confirmations: ancestors.len(), - status: () - }) - ); - } - #[test] fn test_insert_picks_latest_blockhash_fork() { let sig = Signature::default(); @@ -371,10 +334,6 @@ mod tests { status_cache.get_signature_status(&sig, &blockhash, &ancestors), None ); - assert_eq!( - status_cache.get_signature_status_slow(&sig, &ancestors), - None - ); } #[test]