Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions core/src/cache_block_time_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use solana_sdk::timing::slot_duration_from_slots_per_year;
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};

pub type CacheBlockTimeReceiver = Receiver<Arc<Bank>>;
pub type CacheBlockTimeSender = Sender<Arc<Bank>>;

pub struct CacheBlockTimeService {
thread_hdl: JoinHandle<()>,
}

const CACHE_BLOCK_TIME_WARNING_MS: u64 = 150;

impl CacheBlockTimeService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
cache_block_time_receiver: CacheBlockTimeReceiver,
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-cache-block-time".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let recv_result = cache_block_time_receiver.recv_timeout(Duration::from_secs(1));
match recv_result {
Err(RecvTimeoutError::Disconnected) => {
break;
}
Ok(bank) => {
let mut cache_block_time_timer = Measure::start("cache_block_time_timer");
Self::cache_block_time(bank, &blockstore);
cache_block_time_timer.stop();
if cache_block_time_timer.as_ms() > CACHE_BLOCK_TIME_WARNING_MS {
warn!(
"cache_block_time operation took: {}ms",
cache_block_time_timer.as_ms()
);
}
}
_ => {}
}
})
.unwrap();
Self { thread_hdl }
}

fn cache_block_time(bank: Arc<Bank>, blockstore: &Arc<Blockstore>) {
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(bank.slot());
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);

if let Err(e) = blockstore.cache_block_time(bank.slot(), slot_duration, stakes) {
error!("cache_block_time failed: slot {:?} {:?}", bank.slot(), e);
}
Comment on lines 63 to 70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How expensive is this? Maybe wrap a measure around it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pieces of this are measured in get_timestamp_slots() and cache_block_time(). You looking for a sum?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just more wondering if we run into issues of being able to keep up with the new slots as they come in. But if this operation takes 1-10ms or so then no worries

Copy link
Copy Markdown
Contributor Author

@CriesofCarrots CriesofCarrots Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empirical data suggests this operation will be in the 2-10ms range for current mainnet-beta throughput. However, it does depend on deserializing blocks to find vote transactions, and that deserialization definitely takes longer as TPS increases. (With about 10k TPS, I was seeing this take about 10x as long on my under-powered GCE instance.) One solution would be to index vote transactions/timestamps in blockstore to avoid the deserialization altogether; possibly as part of the transaction-status-service. I think that could be a follow-up optimization. Wdyt? @mvines

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok cool, that seems fine for now. But how about this:

  1. Move the recv_timeout() out of cache_block_time()
  2. In the main thread loop, wrap a measure around cache_block_time(). Then if cache_block_time() takes longer than IDK, 100ms or so, emit a warn! or error! log.

Since this is an unbounded channel, if cache_block_time() ever does get backed up and roots start coming in faster than it can process then we have a memory leak and will probably eventually OOM. It'd be nice to get yelled at from the log if this ever starts happening

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a plan

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this look? d0a0f50

}

pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod accounts_hash_verifier;
pub mod banking_stage;
pub mod bigtable_upload_service;
pub mod broadcast_stage;
pub mod cache_block_time_service;
pub mod cluster_info_vote_listener;
pub mod commitment_service;
pub mod completed_data_sets_service;
Expand Down
41 changes: 41 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{
bank_weight_fork_choice::BankWeightForkChoice,
broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct ReplayStageConfig {
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_time_sender: Option<CacheBlockTimeSender>,
}

#[derive(Default)]
Expand Down Expand Up @@ -235,6 +237,7 @@ impl ReplayStage {
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
cache_block_time_sender,
} = config;

trace!("replay stage");
Expand Down Expand Up @@ -494,6 +497,7 @@ impl ReplayStage {
&subscriptions,
&block_commitment_cache,
&mut heaviest_subtree_fork_choice,
&cache_block_time_sender,
)?;
};
voting_time.stop();
Expand Down Expand Up @@ -1004,6 +1008,7 @@ impl ReplayStage {
subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) -> Result<()> {
if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
Expand All @@ -1029,6 +1034,12 @@ impl ReplayStage {
blockstore
.set_roots(&rooted_slots)
.expect("Ledger set roots failed");
Self::cache_block_times(
blockstore,
bank_forks,
&rooted_slots,
cache_block_time_sender,
);
let highest_confirmed_root = Some(
block_commitment_cache
.read()
Expand Down Expand Up @@ -1855,6 +1866,36 @@ impl ReplayStage {
}
}

fn cache_block_times(
blockstore: &Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>,
rooted_slots: &[Slot],
cache_block_time_sender: &Option<CacheBlockTimeSender>,
) {
if let Some(cache_block_time_sender) = cache_block_time_sender {
for slot in rooted_slots {
if blockstore
.get_block_time(*slot)
.unwrap_or_default()
.is_none()
{
if let Some(rooted_bank) = bank_forks.read().unwrap().get(*slot) {
cache_block_time_sender
.send(rooted_bank.clone())
.unwrap_or_else(|err| {
warn!("cache_block_time_sender failed: {:?}", err)
});
} else {
error!(
"rooted_bank {:?} not available in BankForks; block time not cached",
slot
);
}
}
}
}
}

pub fn get_unlock_switch_vote_slot(cluster_type: ClusterType) -> Slot {
match cluster_type {
ClusterType::Development => 0,
Expand Down
22 changes: 8 additions & 14 deletions core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use solana_sdk::{
stake_history::StakeHistory,
system_instruction,
sysvar::{stake_history, Sysvar},
timing::slot_duration_from_slots_per_year,
transaction::{self, Transaction},
};
use solana_stake_program::stake_state::StakeState;
Expand Down Expand Up @@ -686,18 +685,7 @@ impl JsonRpcRequestProcessor {
.unwrap()
.highest_confirmed_root()
{
// This calculation currently assumes that bank.slots_per_year will remain unchanged after
// genesis (ie. that this bank's slot_per_year will be applicable to any rooted slot being
// queried). If these values will be variable in the future, those timing parameters will
// need to be stored persistently, and the slot_duration calculation will likely need to be
// moved upstream into blockstore. Also, an explicit commitment level will need to be set.
let bank = self.bank(None);
let slot_duration = slot_duration_from_slots_per_year(bank.slots_per_year());
let epoch = bank.epoch_schedule().get_epoch(slot);
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);

let result = self.blockstore.get_block_time(slot, slot_duration, stakes);
let result = self.blockstore.get_block_time(slot);
self.check_slot_cleaned_up(&result, slot)?;
Ok(result.ok().unwrap_or(None))
} else {
Expand Down Expand Up @@ -2544,6 +2532,7 @@ pub mod tests {
nonce, rpc_port,
signature::{Keypair, Signer},
system_program, system_transaction,
timing::slot_duration_from_slots_per_year,
transaction::{self, TransactionError},
};
use solana_transaction_status::{EncodedTransaction, TransactionWithStatusMeta, UiMessage};
Expand All @@ -2555,7 +2544,7 @@ pub mod tests {
option::COption, solana_sdk::pubkey::Pubkey as SplTokenPubkey,
state::AccountState as TokenAccountState, state::Mint,
};
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

const TEST_MINT_LAMPORTS: u64 = 1_000_000;
const TEST_SLOTS_PER_EPOCH: u64 = DELINQUENT_VALIDATOR_SLOT_DISTANCE + 1;
Expand Down Expand Up @@ -2656,6 +2645,11 @@ pub mod tests {

for root in roots.iter() {
bank_forks.write().unwrap().set_root(*root, &None, Some(0));
let mut stakes = HashMap::new();
stakes.insert(leader_vote_keypair.pubkey(), (1, Account::default()));
blockstore
.cache_block_time(*root, Duration::from_millis(400), &stakes)
.unwrap();
}
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
accounts_background_service::AccountsBackgroundService,
accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker},
cluster_slots::ClusterSlots,
Expand Down Expand Up @@ -96,6 +97,7 @@ impl Tvu {
cfg: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
snapshot_package_sender: Option<AccountsPackageSender>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
Expand Down Expand Up @@ -191,6 +193,7 @@ impl Tvu {
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
cache_block_time_sender,
};

let replay_stage = ReplayStage::new(
Expand Down Expand Up @@ -327,6 +330,7 @@ pub mod tests {
None,
None,
None,
None,
Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender,
verified_vote_receiver,
Expand Down
22 changes: 22 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use crate::{
broadcast_stage::BroadcastStageType,
cache_block_time_service::{CacheBlockTimeSender, CacheBlockTimeService},
cluster_info::{ClusterInfo, Node},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
Expand Down Expand Up @@ -149,6 +150,8 @@ struct TransactionHistoryServices {
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
cache_block_time_service: Option<CacheBlockTimeService>,
}

pub struct Validator {
Expand All @@ -157,6 +160,7 @@ pub struct Validator {
rpc_service: Option<(JsonRpcService, PubSubService, RpcBanksService)>,
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_service: Option<CacheBlockTimeService>,
gossip_service: GossipService,
serve_repair_service: ServeRepairService,
completed_data_sets_service: CompletedDataSetsService,
Expand Down Expand Up @@ -244,6 +248,8 @@ impl Validator {
transaction_status_service,
rewards_recorder_sender,
rewards_recorder_service,
cache_block_time_sender,
cache_block_time_service,
},
) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit);

Expand Down Expand Up @@ -477,6 +483,7 @@ impl Validator {
config.enable_partition.clone(),
transaction_status_sender.clone(),
rewards_recorder_sender,
cache_block_time_sender,
snapshot_package_sender,
vote_tracker.clone(),
retransmit_slots_sender,
Expand Down Expand Up @@ -523,6 +530,7 @@ impl Validator {
rpc_service,
transaction_status_service,
rewards_recorder_service,
cache_block_time_service,
snapshot_packager_service,
completed_data_sets_service,
tpu,
Expand Down Expand Up @@ -587,6 +595,10 @@ impl Validator {
rewards_recorder_service.join()?;
}

if let Some(cache_block_time_service) = self.cache_block_time_service {
cache_block_time_service.join()?;
}

if let Some(s) = self.snapshot_packager_service {
s.join()?;
}
Expand Down Expand Up @@ -772,6 +784,14 @@ fn initialize_rpc_transaction_history_services(
let rewards_recorder_sender = Some(rewards_recorder_sender);
let rewards_recorder_service = Some(RewardsRecorderService::new(
rewards_receiver,
blockstore.clone(),
exit,
));

let (cache_block_time_sender, cache_block_time_receiver) = unbounded();
let cache_block_time_sender = Some(cache_block_time_sender);
let cache_block_time_service = Some(CacheBlockTimeService::new(
cache_block_time_receiver,
blockstore,
exit,
));
Expand All @@ -780,6 +800,8 @@ fn initialize_rpc_transaction_history_services(
transaction_status_service,
rewards_recorder_sender,
rewards_recorder_service,
cache_block_time_sender,
cache_block_time_service,
}
}

Expand Down
Loading