From e663f058a9dc4e55448c37574f7f29ed5a04d6fb Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 31 Aug 2020 13:28:13 -0600 Subject: [PATCH 1/6] Add blockstore column to cache block times --- ledger/src/blockstore.rs | 3 ++ ledger/src/blockstore/blockstore_purge.rs | 8 ++++++ ledger/src/blockstore_db.rs | 34 +++++++++++++++++++---- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e4c87516de2..f8d020eb2f1 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -137,6 +137,7 @@ pub struct Blockstore { transaction_status_index_cf: LedgerColumn, active_transaction_status_index: RwLock, rewards_cf: LedgerColumn, + blocktime_cf: LedgerColumn, last_root: Arc>, insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, @@ -292,6 +293,7 @@ impl Blockstore { let address_signatures_cf = db.column(); let transaction_status_index_cf = db.column(); let rewards_cf = db.column(); + let blocktime_cf = db.column(); let db = Arc::new(db); @@ -336,6 +338,7 @@ impl Blockstore { transaction_status_index_cf, active_transaction_status_index: RwLock::new(active_transaction_status_index), rewards_cf, + blocktime_cf, new_shreds_signals: vec![], completed_slots_senders: vec![], insert_shreds_lock: Arc::new(Mutex::new(())), diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 649fc5f5f39..db08c46cbcc 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -133,6 +133,10 @@ impl Blockstore { & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok(); let mut w_active_transaction_status_index = self.active_transaction_status_index.write().unwrap(); @@ -223,6 +227,10 @@ impl Blockstore { && self .rewards_cf .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .blocktime_cf + .compact_range(from_slot, to_slot) .unwrap_or(false); compact_timer.stop(); if !result { diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index f95e8c95f22..7d1ec9833e1 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -10,7 +10,11 @@ use rocksdb::{ use serde::de::DeserializeOwned; use serde::Serialize; use solana_runtime::hardened_unpack::UnpackError; -use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; +use solana_sdk::{ + clock::{Slot, UnixTimestamp}, + pubkey::Pubkey, + signature::Signature, +}; use solana_transaction_status::{Rewards, TransactionStatusMeta}; use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc}; use thiserror::Error; @@ -46,6 +50,8 @@ const ADDRESS_SIGNATURES_CF: &str = "address_signatures"; const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index"; /// Column family for Rewards const REWARDS_CF: &str = "rewards"; +/// Column family for Blocktime +const BLOCKTIME_CF: &str = "blocktime"; #[derive(Error, Debug)] pub enum BlockstoreError { @@ -128,6 +134,10 @@ pub mod columns { #[derive(Debug)] /// The rewards column pub struct Rewards; + + #[derive(Debug)] + /// The blocktime column + pub struct Blocktime; } pub enum AccessType { @@ -187,8 +197,9 @@ impl Rocks { recovery_mode: Option, ) -> Result { use columns::{ - AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, - Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, + AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, + Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, + TransactionStatusIndex, }; fs::create_dir_all(&path)?; @@ -221,6 +232,8 @@ impl Rocks { let transaction_status_index_cf_descriptor = ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options()); let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); + let blocktime_cf_descriptor = + ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options()); let cfs = vec![ (SlotMeta::NAME, meta_cf_descriptor), @@ -239,6 +252,7 @@ impl Rocks { transaction_status_index_cf_descriptor, ), (Rewards::NAME, rewards_cf_descriptor), + (Blocktime::NAME, blocktime_cf_descriptor), ]; // Open the database @@ -276,8 +290,9 @@ impl Rocks { fn columns(&self) -> Vec<&'static str> { use columns::{ - AddressSignatures, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, - Root, ShredCode, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex, + AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, + Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus, + TransactionStatusIndex, }; vec![ @@ -294,6 +309,7 @@ impl Rocks { AddressSignatures::NAME, TransactionStatusIndex::NAME, Rewards::NAME, + Blocktime::NAME, ] } @@ -519,6 +535,14 @@ impl TypedColumn for columns::Rewards { type Type = Rewards; } +impl SlotColumn for columns::Blocktime {} +impl ColumnName for columns::Blocktime { + const NAME: &'static str = BLOCKTIME_CF; +} +impl TypedColumn for columns::Blocktime { + type Type = UnixTimestamp; +} + impl Column for columns::ShredCode { type Index = (u64, u64); From 317eaf4af93d1b4865343fd5bff4832101c0db66 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 31 Aug 2020 14:28:03 -0600 Subject: [PATCH 2/6] Add method to cache block time --- ledger/src/blockstore.rs | 47 +++++++++++++++++++++++++++++++++++++ ledger/src/blockstore_db.rs | 1 + 2 files changed, 48 insertions(+) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f8d020eb2f1..d562ba530a7 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1651,6 +1651,53 @@ impl Blockstore { slots } + pub fn cache_block_time( + &self, + slot: Slot, + slot_duration: Duration, + stakes: &HashMap, + ) -> Result<()> { + if !self.is_root(slot) { + return Err(BlockstoreError::SlotNotRooted); + } + let mut get_unique_timestamps = Measure::start("get_unique_timestamps"); + let root_iterator = self + .db + .iter::(IteratorMode::From(slot, IteratorDirection::Reverse)); + let mut timestamp_slots: Vec = root_iterator + .unwrap() + .map(|(iter_slot, _)| iter_slot) + .take(TIMESTAMP_SLOT_RANGE) + .collect(); + timestamp_slots.sort(); + let unique_timestamps: HashMap = timestamp_slots + .into_iter() + .flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default()) + .collect(); + get_unique_timestamps.stop(); + + let mut calculate_timestamp = Measure::start("calculate_timestamp"); + let stake_weighted_timestamp = + calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration) + .ok_or(BlockstoreError::EmptyEpochStakes)?; + calculate_timestamp.stop(); + datapoint_info!( + "blockstore-get-block-time", + ("slot", slot as i64, i64), + ( + "get_unique_timestamps_us", + get_unique_timestamps.as_us() as i64, + i64 + ), + ( + "calculate_stake_weighted_timestamp_us", + calculate_timestamp.as_us() as i64, + i64 + ) + ); + self.blocktime_cf.put(slot, &stake_weighted_timestamp) + } + pub fn get_first_available_block(&self) -> Result { let mut root_iterator = self.rooted_slot_iterator(self.lowest_slot())?; Ok(root_iterator.next().unwrap_or_default()) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 7d1ec9833e1..130a92bc87c 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -67,6 +67,7 @@ pub enum BlockstoreError { UnpackError(#[from] UnpackError), UnableToSetOpenFileDescriptorLimit, TransactionStatusSlotMismatch, + EmptyEpochStakes, } pub type Result = std::result::Result; From a3a6131de1a1bbfc6d6a288da5dc130e57a4fec3 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 31 Aug 2020 15:22:02 -0600 Subject: [PATCH 3/6] Add service to cache block time --- core/src/cache_block_time_service.rs | 65 ++++++++++++++++++++++++++++ core/src/lib.rs | 1 + core/src/replay_stage.rs | 41 ++++++++++++++++++ core/src/tvu.rs | 4 ++ core/src/validator.rs | 22 ++++++++++ ledger/src/blockstore.rs | 17 ++++++++ 6 files changed, 150 insertions(+) create mode 100644 core/src/cache_block_time_service.rs diff --git a/core/src/cache_block_time_service.rs b/core/src/cache_block_time_service.rs new file mode 100644 index 00000000000..db4a48f27a9 --- /dev/null +++ b/core/src/cache_block_time_service.rs @@ -0,0 +1,65 @@ +use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; +use solana_ledger::blockstore::Blockstore; +use solana_runtime::bank::Bank; +use solana_sdk::timing::slot_duration_from_slots_per_year; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub type CacheBlockTimeReceiver = Receiver>; +pub type CacheBlockTimeSender = Sender>; + +pub struct CacheBlockTimeService { + thread_hdl: JoinHandle<()>, +} + +impl CacheBlockTimeService { + #[allow(clippy::new_ret_no_self)] + pub fn new( + cache_block_time_receiver: CacheBlockTimeReceiver, + blockstore: Arc, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-cache-block-time".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + if let Err(RecvTimeoutError::Disconnected) = + Self::cache_block_time(&cache_block_time_receiver, &blockstore) + { + break; + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn cache_block_time( + cache_block_time_receiver: &CacheBlockTimeReceiver, + blockstore: &Arc, + ) -> Result<(), RecvTimeoutError> { + let bank = cache_block_time_receiver.recv_timeout(Duration::from_secs(1))?; + let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year()); + let epoch = bank.epoch_schedule().get_epoch(bank.slot()); + let stakes = HashMap::new(); + let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes); + + if let Err(e) = blockstore.cache_block_time(bank.slot(), slot_duration, stakes) { + error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e); + } + Ok(()) + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 03d621674c0..042b0ef7402 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -11,6 +11,7 @@ pub mod accounts_hash_verifier; pub mod banking_stage; pub mod bigtable_upload_service; pub mod broadcast_stage; +pub mod cache_block_time_service; pub mod cluster_info_vote_listener; pub mod commitment_service; pub mod completed_data_sets_service; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 9277720ea6e..4c5e7b1bbec 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3,6 +3,7 @@ use crate::{ bank_weight_fork_choice::BankWeightForkChoice, broadcast_stage::RetransmitSlotsSender, + cache_block_time_service::CacheBlockTimeSender, cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, @@ -106,6 +107,7 @@ pub struct ReplayStageConfig { pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, + pub cache_block_time_sender: Option, } #[derive(Default)] @@ -235,6 +237,7 @@ impl ReplayStage { block_commitment_cache, transaction_status_sender, rewards_recorder_sender, + cache_block_time_sender, } = config; trace!("replay stage"); @@ -494,6 +497,7 @@ impl ReplayStage { &subscriptions, &block_commitment_cache, &mut heaviest_subtree_fork_choice, + &cache_block_time_sender, )?; }; voting_time.stop(); @@ -1004,6 +1008,7 @@ impl ReplayStage { subscriptions: &Arc, block_commitment_cache: &Arc>, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + cache_block_time_sender: &Option, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1029,6 +1034,12 @@ impl ReplayStage { blockstore .set_roots(&rooted_slots) .expect("Ledger set roots failed"); + Self::cache_block_times( + blockstore, + bank_forks, + &rooted_slots, + cache_block_time_sender, + ); let highest_confirmed_root = Some( block_commitment_cache .read() @@ -1855,6 +1866,36 @@ impl ReplayStage { } } + fn cache_block_times( + blockstore: &Arc, + bank_forks: &Arc>, + rooted_slots: &[Slot], + cache_block_time_sender: &Option, + ) { + if let Some(cache_block_time_sender) = cache_block_time_sender { + for slot in rooted_slots { + if blockstore + .get_block_time2(*slot) + .unwrap_or_default() + .is_none() + { + if let Some(rooted_bank) = bank_forks.read().unwrap().get(*slot) { + cache_block_time_sender + .send(rooted_bank.clone()) + .unwrap_or_else(|err| { + warn!("cache_block_time_sender failed: {:?}", err) + }); + } else { + error!( + "rooted_bank {:?} not available in BankForks; block time not cached", + slot + ); + } + } + } + } + } + pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot { match cluster_type { ClusterType::Development => 0, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 95d35ccc3c1..6b06a8a9b11 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -5,6 +5,7 @@ use crate::{ accounts_background_service::AccountsBackgroundService, accounts_hash_verifier::AccountsHashVerifier, broadcast_stage::RetransmitSlotsSender, + cache_block_time_service::CacheBlockTimeSender, cluster_info::ClusterInfo, cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker}, cluster_slots::ClusterSlots, @@ -96,6 +97,7 @@ impl Tvu { cfg: Option>, transaction_status_sender: Option, rewards_recorder_sender: Option, + cache_block_time_sender: Option, snapshot_package_sender: Option, vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, @@ -191,6 +193,7 @@ impl Tvu { block_commitment_cache, transaction_status_sender, rewards_recorder_sender, + cache_block_time_sender, }; let replay_stage = ReplayStage::new( @@ -327,6 +330,7 @@ pub mod tests { None, None, None, + None, Arc::new(VoteTracker::new(&bank)), retransmit_slots_sender, verified_vote_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index 37d22ccfcbe..4b1184b18dd 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -2,6 +2,7 @@ use crate::{ broadcast_stage::BroadcastStageType, + cache_block_time_service::{CacheBlockTimeSender, CacheBlockTimeService}, cluster_info::{ClusterInfo, Node}, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, @@ -149,6 +150,8 @@ struct TransactionHistoryServices { transaction_status_service: Option, rewards_recorder_sender: Option, rewards_recorder_service: Option, + cache_block_time_sender: Option, + cache_block_time_service: Option, } pub struct Validator { @@ -157,6 +160,7 @@ pub struct Validator { rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>, transaction_status_service: Option, rewards_recorder_service: Option, + cache_block_time_service: Option, gossip_service: GossipService, serve_repair_service: ServeRepairService, completed_data_sets_service: CompletedDataSetsService, @@ -244,6 +248,8 @@ impl Validator { transaction_status_service, rewards_recorder_sender, rewards_recorder_service, + cache_block_time_sender, + cache_block_time_service, }, ) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit); @@ -477,6 +483,7 @@ impl Validator { config.enable_partition.clone(), transaction_status_sender.clone(), rewards_recorder_sender, + cache_block_time_sender, snapshot_package_sender, vote_tracker.clone(), retransmit_slots_sender, @@ -523,6 +530,7 @@ impl Validator { rpc_service, transaction_status_service, rewards_recorder_service, + cache_block_time_service, snapshot_packager_service, completed_data_sets_service, tpu, @@ -587,6 +595,10 @@ impl Validator { rewards_recorder_service.join()?; } + if let Some(cache_block_time_service) = self.cache_block_time_service { + cache_block_time_service.join()?; + } + if let Some(s) = self.snapshot_packager_service { s.join()?; } @@ -772,6 +784,14 @@ fn initialize_rpc_transaction_history_services( let rewards_recorder_sender = Some(rewards_recorder_sender); let rewards_recorder_service = Some(RewardsRecorderService::new( rewards_receiver, + blockstore.clone(), + exit, + )); + + let (cache_block_time_sender, cache_block_time_receiver) = unbounded(); + let cache_block_time_sender = Some(cache_block_time_sender); + let cache_block_time_service = Some(CacheBlockTimeService::new( + cache_block_time_receiver, blockstore, exit, )); @@ -780,6 +800,8 @@ fn initialize_rpc_transaction_history_services( transaction_status_service, rewards_recorder_sender, rewards_recorder_service, + cache_block_time_sender, + cache_block_time_service, } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index d562ba530a7..f49a9efcdad 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1651,6 +1651,23 @@ impl Blockstore { slots } + pub fn get_block_time2( + &self, + slot: Slot, + ) -> Result> { + datapoint_info!( + "blockstore-rpc-api", + ("method", "get_block_time".to_string(), String) + ); + let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); + // lowest_cleanup_slot is the last slot that was not cleaned up by + // LedgerCleanupService + if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot { + return Err(BlockstoreError::SlotCleanedUp); + } + self.blocktime_cf.get(slot) + } + pub fn cache_block_time( &self, slot: Slot, From 733e053ef5c6a2ed5ca15cd82bd258b4d08dd7a1 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 31 Aug 2020 15:42:46 -0600 Subject: [PATCH 4/6] Update rpc getBlockTime to use new method, and refactor blockstore slightly --- core/src/replay_stage.rs | 2 +- core/src/rpc.rs | 22 ++-- ledger/src/blockstore.rs | 202 +++++++++++++++--------------------- ledger/src/blockstore_db.rs | 1 + 4 files changed, 92 insertions(+), 135 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 4c5e7b1bbec..67e8f1655e3 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1875,7 +1875,7 @@ impl ReplayStage { if let Some(cache_block_time_sender) = cache_block_time_sender { for slot in rooted_slots { if blockstore - .get_block_time2(*slot) + .get_block_time(*slot) .unwrap_or_default() .is_none() { diff --git a/core/src/rpc.rs b/core/src/rpc.rs index a191ba036d2..3c1bf0334b5 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -54,7 +54,6 @@ use solana_sdk::{ stake_history::StakeHistory, system_instruction, sysvar::{stake_history, Sysvar}, - timing::slot_duration_from_slots_per_year, transaction::{self, Transaction}, }; use solana_stake_program::stake_state::StakeState; @@ -686,18 +685,7 @@ impl JsonRpcRequestProcessor { .unwrap() .highest_confirmed_root() { - // This calculation currently assumes that bank.slots_per_year will remain unchanged after - // genesis (ie. that this bank's slot_per_year will be applicable to any rooted slot being - // queried). If these values will be variable in the future, those timing parameters will - // need to be stored persistently, and the slot_duration calculation will likely need to be - // moved upstream into blockstore. Also, an explicit commitment level will need to be set. - let bank = self.bank(None); - let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year()); - let epoch = bank.epoch_schedule().get_epoch(slot); - let stakes = HashMap::new(); - let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes); - - let result = self.blockstore.get_block_time(slot, slot_duration, stakes); + let result = self.blockstore.get_block_time(slot); self.check_slot_cleaned_up(&result, slot)?; Ok(result.ok().unwrap_or(None)) } else { @@ -2544,6 +2532,7 @@ pub mod tests { nonce, rpc_port, signature::{Keypair, Signer}, system_program, system_transaction, + timing::slot_duration_from_slots_per_year, transaction::{self, TransactionError}, }; use solana_transaction_status::{EncodedTransaction, TransactionWithStatusMeta, UiMessage}; @@ -2555,7 +2544,7 @@ pub mod tests { option::COption, solana_sdk::pubkey::Pubkey as SplTokenPubkey, state::AccountState as TokenAccountState, state::Mint, }; - use std::collections::HashMap; + use std::{collections::HashMap, time::Duration}; const TEST_MINT_LAMPORTS: u64 = 1_000_000; const TEST_SLOTS_PER_EPOCH: u64 = DELINQUENT_VALIDATOR_SLOT_DISTANCE + 1; @@ -2656,6 +2645,11 @@ pub mod tests { for root in roots.iter() { bank_forks.write().unwrap().set_root(*root, &None, Some(0)); + let mut stakes = HashMap::new(); + stakes.insert(leader_vote_keypair.pubkey(), (1, Account::default())); + blockstore + .cache_block_time(*root, Duration::from_millis(400), &stakes) + .unwrap(); } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f49a9efcdad..9f8b64889e5 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -78,7 +78,6 @@ thread_local!(static PAR_THREAD_POOL_ALL_CPUS: RefCell = RefCell::ne pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100; pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK; -const TIMESTAMP_SLOT_INTERVAL: u64 = 4500; const TIMESTAMP_SLOT_RANGE: usize = 16; // An upper bound on maximum number of data shreds we can handle in a slot @@ -1560,12 +1559,7 @@ impl Blockstore { } } - pub fn get_block_time( - &self, - slot: Slot, - slot_duration: Duration, - stakes: &HashMap, - ) -> Result> { + pub fn get_block_time(&self, slot: Slot) -> Result> { datapoint_info!( "blockstore-rpc-api", ("method", "get_block_time".to_string(), String) @@ -1576,96 +1570,30 @@ impl Blockstore { if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot { return Err(BlockstoreError::SlotCleanedUp); } - - let mut get_unique_timestamps = Measure::start("get_unique_timestamps"); - let unique_timestamps: HashMap = self - .get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE) - .into_iter() - .flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default()) - .collect(); - get_unique_timestamps.stop(); - - let mut calculate_timestamp = Measure::start("calculate_timestamp"); - let stake_weighted_timestamps = - calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration); - calculate_timestamp.stop(); - datapoint_info!( - "blockstore-get-block-time", - ("slot", slot as i64, i64), - ( - "get_unique_timestamps_us", - get_unique_timestamps.as_us() as i64, - i64 - ), - ( - "calculate_stake_weighted_timestamp_us", - calculate_timestamp.as_us() as i64, - i64 - ) - ); - - Ok(stake_weighted_timestamps) + self.blocktime_cf.get(slot) } - fn get_timestamp_slots( - &self, - slot: Slot, - timestamp_interval: u64, - timestamp_sample_range: usize, - ) -> Vec { - let baseline_slot = slot - (slot % timestamp_interval); - let root_iterator = self.db.iter::(IteratorMode::From( - baseline_slot, - IteratorDirection::Forward, - )); + fn get_timestamp_slots(&self, slot: Slot, timestamp_sample_range: usize) -> Vec { + let root_iterator = self + .db + .iter::(IteratorMode::From(slot, IteratorDirection::Reverse)); if !self.is_root(slot) || root_iterator.is_err() { return vec![]; } let mut get_slots = Measure::start("get_slots"); - let mut slots: Vec = root_iterator + let mut timestamp_slots: Vec = root_iterator .unwrap() .map(|(iter_slot, _)| iter_slot) .take(timestamp_sample_range) - .filter(|&iter_slot| iter_slot <= slot) .collect(); - - if slots.len() < timestamp_sample_range && baseline_slot >= timestamp_interval { - let earlier_baseline = baseline_slot - timestamp_interval; - let earlier_root_iterator = self.db.iter::(IteratorMode::From( - earlier_baseline, - IteratorDirection::Forward, - )); - if let Ok(iterator) = earlier_root_iterator { - slots = iterator - .map(|(iter_slot, _)| iter_slot) - .take(timestamp_sample_range) - .collect(); - } - } + timestamp_slots.sort(); get_slots.stop(); datapoint_info!( "blockstore-get-timestamp-slots", ("slot", slot as i64, i64), ("get_slots_us", get_slots.as_us() as i64, i64) ); - slots - } - - pub fn get_block_time2( - &self, - slot: Slot, - ) -> Result> { - datapoint_info!( - "blockstore-rpc-api", - ("method", "get_block_time".to_string(), String) - ); - let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); - // lowest_cleanup_slot is the last slot that was not cleaned up by - // LedgerCleanupService - if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot { - return Err(BlockstoreError::SlotCleanedUp); - } - self.blocktime_cf.get(slot) + timestamp_slots } pub fn cache_block_time( @@ -1678,20 +1606,15 @@ impl Blockstore { return Err(BlockstoreError::SlotNotRooted); } let mut get_unique_timestamps = Measure::start("get_unique_timestamps"); - let root_iterator = self - .db - .iter::(IteratorMode::From(slot, IteratorDirection::Reverse)); - let mut timestamp_slots: Vec = root_iterator - .unwrap() - .map(|(iter_slot, _)| iter_slot) - .take(TIMESTAMP_SLOT_RANGE) - .collect(); - timestamp_slots.sort(); - let unique_timestamps: HashMap = timestamp_slots + let unique_timestamps: HashMap = self + .get_timestamp_slots(slot, TIMESTAMP_SLOT_RANGE) .into_iter() .flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default()) .collect(); get_unique_timestamps.stop(); + if unique_timestamps.is_empty() { + return Err(BlockstoreError::NoVoteTimestampsInRange); + } let mut calculate_timestamp = Measure::start("calculate_timestamp"); let stake_weighted_timestamp = @@ -5628,8 +5551,6 @@ pub mod tests { fn test_get_timestamp_slots() { let timestamp_sample_range = 5; let ticks_per_slot = 5; - // Smaller interval than TIMESTAMP_SLOT_INTERVAL for convenience of building blockstore - let timestamp_interval = 7; /* Build a blockstore with < TIMESTAMP_SLOT_RANGE roots */ @@ -5656,11 +5577,11 @@ pub mod tests { blockstore.set_roots(&[1, 2, 3]).unwrap(); assert_eq!( - blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range), + blockstore.get_timestamp_slots(2, timestamp_sample_range), vec![0, 1, 2] ); assert_eq!( - blockstore.get_timestamp_slots(3, timestamp_interval, timestamp_sample_range), + blockstore.get_timestamp_slots(3, timestamp_sample_range), vec![0, 1, 2, 3] ); @@ -5668,14 +5589,13 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); /* - Build a blockstore in the ledger with the following rooted slots: - [0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 14, 15, 16, 17] + Build a blockstore in the ledger with gaps in rooted slot sequence */ let blockstore_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&blockstore_path).unwrap(); blockstore.set_roots(&[0]).unwrap(); - let desired_roots = vec![1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19]; + let desired_roots = vec![1, 2, 3, 5, 6, 8, 11]; let mut last_entry_hash = Hash::default(); for (i, slot) in desired_roots.iter().enumerate() { let parent = { @@ -5696,28 +5616,20 @@ pub mod tests { blockstore.set_roots(&desired_roots).unwrap(); assert_eq!( - blockstore.get_timestamp_slots(2, timestamp_interval, timestamp_sample_range), + blockstore.get_timestamp_slots(2, timestamp_sample_range), vec![0, 1, 2] ); assert_eq!( - blockstore.get_timestamp_slots(6, timestamp_interval, timestamp_sample_range), - vec![0, 1, 2, 3, 4] - ); - assert_eq!( - blockstore.get_timestamp_slots(8, timestamp_interval, timestamp_sample_range), - vec![0, 1, 2, 3, 4] + blockstore.get_timestamp_slots(6, timestamp_sample_range), + vec![1, 2, 3, 5, 6] ); assert_eq!( - blockstore.get_timestamp_slots(13, timestamp_interval, timestamp_sample_range), - vec![8, 9, 10, 11, 12] + blockstore.get_timestamp_slots(8, timestamp_sample_range), + vec![2, 3, 5, 6, 8] ); assert_eq!( - blockstore.get_timestamp_slots(18, timestamp_interval, timestamp_sample_range), - vec![8, 9, 10, 11, 12] - ); - assert_eq!( - blockstore.get_timestamp_slots(19, timestamp_interval, timestamp_sample_range), - vec![14, 16, 17, 18, 19] + blockstore.get_timestamp_slots(11, timestamp_sample_range), + vec![3, 5, 6, 8, 11] ); } @@ -5888,14 +5800,25 @@ pub mod tests { ); assert_eq!(blockstore.get_block_timestamps(2).unwrap(), vec![]); - // Build epoch vote_accounts HashMap to test stake-weighted block time blockstore.set_roots(&[3, 8]).unwrap(); let mut stakes = HashMap::new(); + let slot_duration = Duration::from_millis(400); + for slot in &[1, 2, 3, 8] { + assert!(blockstore + .cache_block_time(*slot, slot_duration, &stakes) + .is_err()); + } + + // Build epoch vote_accounts HashMap to test stake-weighted block time for (i, keypair) in vote_keypairs.iter().enumerate() { stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default())); } - let slot_duration = Duration::from_millis(400); - let block_time_slot_3 = blockstore.get_block_time(3, slot_duration, &stakes); + for slot in &[1, 2, 3, 8] { + blockstore + .cache_block_time(*slot, slot_duration, &stakes) + .unwrap(); + } + let block_time_slot_3 = blockstore.get_block_time(3); let mut total_stake = 0; let mut expected_time: u64 = (0..6) @@ -5911,14 +5834,53 @@ pub mod tests { expected_time /= total_stake; assert_eq!(block_time_slot_3.unwrap().unwrap() as u64, expected_time); assert_eq!( - blockstore - .get_block_time(8, slot_duration, &stakes) - .unwrap() - .unwrap() as u64, + blockstore.get_block_time(8).unwrap().unwrap() as u64, expected_time + 2 // At 400ms block duration, 5 slots == 2sec ); } + #[test] + fn test_get_block_time_no_timestamps() { + let vote_keypairs: Vec = (0..6).map(|_| Keypair::new()).collect(); + + // Populate slot 1 with vote transactions, none of which have timestamps + let mut vote_entries: Vec = Vec::new(); + for (i, keypair) in vote_keypairs.iter().enumerate() { + let vote = Vote { + slots: vec![1], + hash: Hash::default(), + timestamp: None, + }; + let vote_ix = vote_instruction::vote(&keypair.pubkey(), &keypair.pubkey(), vote); + let vote_msg = Message::new(&[vote_ix], Some(&keypair.pubkey())); + let vote_tx = Transaction::new(&[keypair], vote_msg, Hash::default()); + + vote_entries.push(next_entry_mut(&mut Hash::default(), 0, vec![vote_tx])); + let mut tick = create_ticks(1, 0, hash(&serialize(&i).unwrap())); + vote_entries.append(&mut tick); + } + let shreds = entries_to_test_shreds(vote_entries, 1, 0, true, 0); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + blockstore.insert_shreds(shreds, None, false).unwrap(); + // Populate slot 2 with ticks only + fill_blockstore_slot_with_ticks(&blockstore, 6, 2, 1, Hash::default()); + blockstore.set_roots(&[0, 1, 2]).unwrap(); + + // Build epoch vote_accounts HashMap to test stake-weighted block time + let mut stakes = HashMap::new(); + for (i, keypair) in vote_keypairs.iter().enumerate() { + stakes.insert(keypair.pubkey(), (1 + i as u64, Account::default())); + } + let slot_duration = Duration::from_millis(400); + for slot in &[1, 2, 3, 8] { + assert!(blockstore + .cache_block_time(*slot, slot_duration, &stakes) + .is_err()); + assert_eq!(blockstore.get_block_time(*slot).unwrap(), None); + } + } + #[test] fn test_calculate_stake_weighted_timestamp() { let recent_timestamp: UnixTimestamp = 1_578_909_061; diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 130a92bc87c..772a5be76ec 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -68,6 +68,7 @@ pub enum BlockstoreError { UnableToSetOpenFileDescriptorLimit, TransactionStatusSlotMismatch, EmptyEpochStakes, + NoVoteTimestampsInRange, } pub type Result = std::result::Result; From 8572ed275568e663a6aa1d9aab8348df6eba3d38 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 1 Sep 2020 11:05:43 -0600 Subject: [PATCH 5/6] Return block_time with confirmed block, if available --- ledger/src/blockstore.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 9f8b64889e5..1a8c3a6fb92 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1688,6 +1688,7 @@ impl Blockstore { .unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot)); let rewards = self.rewards_cf.get(slot)?.unwrap_or_else(Vec::new); + let block_time = self.blocktime_cf.get(slot)?; let block = ConfirmedBlock { previous_blockhash: previous_blockhash.to_string(), @@ -1699,7 +1700,7 @@ impl Blockstore { slot_transaction_iterator, ), rewards, - block_time: None, // See https://github.com/solana-labs/solana/issues/10089 + block_time, }; return Ok(block); } @@ -5733,7 +5734,7 @@ pub mod tests { let confirmed_block = ledger.get_confirmed_block(slot + 1, None).unwrap(); assert_eq!(confirmed_block.transactions.len(), 100); - let expected_block = ConfirmedBlock { + let mut expected_block = ConfirmedBlock { transactions: expected_transactions .iter() .cloned() @@ -5753,6 +5754,14 @@ pub mod tests { let not_root = ledger.get_confirmed_block(slot + 2, None).unwrap_err(); assert_matches!(not_root, BlockstoreError::SlotNotRooted); + // Test block_time returns, if available + let timestamp = 1_576_183_541; + ledger.blocktime_cf.put(slot + 1, ×tamp).unwrap(); + expected_block.block_time = Some(timestamp); + + let confirmed_block = ledger.get_confirmed_block(slot + 1, None).unwrap(); + assert_eq!(confirmed_block, expected_block); + drop(ledger); Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } From d0a0f50f313501ae4ce3cfbb749f0292d0d26241 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 8 Sep 2020 16:37:50 -0600 Subject: [PATCH 6/6] Add measure and warning to cache-block-time --- core/src/cache_block_time_service.rs | 31 +++++++++++++++++++--------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/core/src/cache_block_time_service.rs b/core/src/cache_block_time_service.rs index db4a48f27a9..07f5eced25e 100644 --- a/core/src/cache_block_time_service.rs +++ b/core/src/cache_block_time_service.rs @@ -1,5 +1,6 @@ use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; use solana_ledger::blockstore::Blockstore; +use solana_measure::measure::Measure; use solana_runtime::bank::Bank; use solana_sdk::timing::slot_duration_from_slots_per_year; use std::{ @@ -19,6 +20,8 @@ pub struct CacheBlockTimeService { thread_hdl: JoinHandle<()>, } +const CACHE_BLOCK_TIME_WARNING_MS: u64 = 150; + impl CacheBlockTimeService { #[allow(clippy::new_ret_no_self)] pub fn new( @@ -33,21 +36,30 @@ impl CacheBlockTimeService { if exit.load(Ordering::Relaxed) { break; } - if let Err(RecvTimeoutError::Disconnected) = - Self::cache_block_time(&cache_block_time_receiver, &blockstore) - { - break; + let recv_result = cache_block_time_receiver.recv_timeout(Duration::from_secs(1)); + match recv_result { + Err(RecvTimeoutError::Disconnected) => { + break; + } + Ok(bank) => { + let mut cache_block_time_timer = Measure::start("cache_block_time_timer"); + Self::cache_block_time(bank, &blockstore); + cache_block_time_timer.stop(); + if cache_block_time_timer.as_ms() > CACHE_BLOCK_TIME_WARNING_MS { + warn!( + "cache_block_time operation took: {}ms", + cache_block_time_timer.as_ms() + ); + } + } + _ => {} } }) .unwrap(); Self { thread_hdl } } - fn cache_block_time( - cache_block_time_receiver: &CacheBlockTimeReceiver, - blockstore: &Arc, - ) -> Result<(), RecvTimeoutError> { - let bank = cache_block_time_receiver.recv_timeout(Duration::from_secs(1))?; + fn cache_block_time(bank: Arc, blockstore: &Arc) { let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year()); let epoch = bank.epoch_schedule().get_epoch(bank.slot()); let stakes = HashMap::new(); @@ -56,7 +68,6 @@ impl CacheBlockTimeService { if let Err(e) = blockstore.cache_block_time(bank.slot(), slot_duration, stakes) { error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e); } - Ok(()) } pub fn join(self) -> thread::Result<()> {