Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/commitment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl AggregateCommitmentService {
let aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;
let aggregation_data = receiver.try_iter().last().unwrap_or(aggregation_data);

let ancestors = aggregation_data.bank.status_cache_ancestors();
let ancestors = aggregation_data.bank.seen_transaction_cache_ancestors();
if ancestors.is_empty() {
continue;
}
Expand Down Expand Up @@ -607,7 +607,7 @@ mod tests {
let working_bank = bank_forks.read().unwrap().working_bank();
let vote_state = get_vote_state(vote_pubkey, &working_bank);
let root = vote_state.root_slot.unwrap();
let ancestors = working_bank.status_cache_ancestors();
let ancestors = working_bank.seen_transaction_cache_ancestors();
let _ = AggregateCommitmentService::update_commitment_cache(
&block_commitment_cache,
CommitmentAggregationData {
Expand Down Expand Up @@ -642,7 +642,7 @@ mod tests {
);

let working_bank = bank_forks.read().unwrap().working_bank();
let ancestors = working_bank.status_cache_ancestors();
let ancestors = working_bank.seen_transaction_cache_ancestors();
let _ = AggregateCommitmentService::update_commitment_cache(
&block_commitment_cache,
CommitmentAggregationData {
Expand Down Expand Up @@ -691,7 +691,7 @@ mod tests {
let vote_state =
get_vote_state(validator_vote_keypairs.vote_keypair.pubkey(), &working_bank);
let root = vote_state.root_slot.unwrap();
let ancestors = working_bank.status_cache_ancestors();
let ancestors = working_bank.seen_transaction_cache_ancestors();
let _ = AggregateCommitmentService::update_commitment_cache(
&block_commitment_cache,
CommitmentAggregationData {
Expand Down
10 changes: 5 additions & 5 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ fn run_bank_forks_snapshot_n<F>(
let bank = bank_forks.write().unwrap().insert(bank);
f(bank.clone_without_scheduler().as_ref(), mint_keypair);
// Set root to make sure we don't end up with too many account storage entries
// and to allow snapshotting of bank and the purging logic on status_cache to
// and to allow snapshotting of bank and the purging logic on `seen_transaction_cache` to
// kick in
if slot % set_root_interval == 0 || slot == last_slot {
if !bank.is_complete() {
Expand Down Expand Up @@ -365,7 +365,7 @@ fn test_slots_to_snapshot(snapshot_version: SnapshotVersion, cluster_type: Clust
.read()
.unwrap()
.root_bank()
.status_cache
.seen_transaction_cache
.read()
.unwrap()
.roots()
Expand All @@ -380,13 +380,13 @@ fn test_slots_to_snapshot(snapshot_version: SnapshotVersion, cluster_type: Clust
#[test_case(V1_2_0, Devnet)]
#[test_case(V1_2_0, Testnet)]
#[test_case(V1_2_0, MainnetBeta)]
fn test_bank_forks_status_cache_snapshot(
fn test_bank_forks_seen_transaction_cache_snapshot(
snapshot_version: SnapshotVersion,
cluster_type: ClusterType,
) {
// create banks up to slot (MAX_CACHE_ENTRIES * 2) + 1 while transferring 1 lamport into 2 different accounts each time
// this is done to ensure the AccountStorageEntries keep getting cleaned up as the root moves
// ahead. Also tests the status_cache purge and status cache snapshotting.
// ahead. Also tests the `seen_transaction_cache` purge and snapshotting.
// Makes sure that the last bank is restored correctly
let key1 = Keypair::new().pubkey();
let key2 = Keypair::new().pubkey();
Expand Down Expand Up @@ -502,7 +502,7 @@ fn test_bank_forks_incremental_snapshot(
};

// Set root to make sure we don't end up with too many account storage entries
// and to allow snapshotting of bank and the purging logic on status_cache to
// and to allow snapshotting of bank and the purging logic on `seen_transaction_cache` to
// kick in
if slot % SET_ROOT_INTERVAL == 0 {
// set_root sends a snapshot request
Expand Down
13 changes: 8 additions & 5 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ fn is_finalized(
slot: Slot,
) -> bool {
slot <= block_commitment_cache.highest_super_majority_root()
&& (blockstore.is_root(slot) || bank.status_cache_ancestors().contains(&slot))
&& (blockstore.is_root(slot) || bank.seen_transaction_cache_ancestors().contains(&slot))
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -1357,7 +1357,10 @@ impl JsonRpcRequestProcessor {
} else if commitment.is_confirmed() {
// Check if block is confirmed
let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed()));
if confirmed_bank.status_cache_ancestors().contains(&slot) {
if confirmed_bank
.seen_transaction_cache_ancestors()
.contains(&slot)
{
self.check_blockstore_writes_complete(slot)?;
let result = self
.runtime
Expand Down Expand Up @@ -1487,7 +1490,7 @@ impl JsonRpcRequestProcessor {
let confirmed_bank = self.get_bank_with_config(config)?;
if last_element < end_slot {
let mut confirmed_blocks = confirmed_bank
.status_cache_ancestors()
.seen_transaction_cache_ancestors()
.into_iter()
.filter(|&slot| slot <= end_slot && slot > last_element)
.collect();
Expand Down Expand Up @@ -1565,7 +1568,7 @@ impl JsonRpcRequestProcessor {
.cloned()
.unwrap_or_else(|| start_slot.saturating_sub(1));
let mut confirmed_blocks = confirmed_bank
.status_cache_ancestors()
.seen_transaction_cache_ancestors()
.into_iter()
.filter(|&slot| slot > last_element)
.collect();
Expand Down Expand Up @@ -1779,7 +1782,7 @@ impl JsonRpcRequestProcessor {
Some(mut confirmed_transaction) => {
if commitment.is_confirmed()
&& confirmed_bank // should be redundant
.status_cache_ancestors()
.seen_transaction_cache_ancestors()
.contains(&confirmed_transaction.slot)
{
if confirmed_transaction.block_time.is_none() {
Expand Down
8 changes: 4 additions & 4 deletions runtime/src/accounts_background_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl SendDroppedBankCallback {

pub struct SnapshotRequest {
pub snapshot_root_bank: Arc<Bank>,
pub status_cache_slot_deltas: Vec<BankSlotDelta>,
pub seen_transaction_cache_slot_deltas: Vec<BankSlotDelta>,
pub request_kind: SnapshotRequestKind,

/// The instant this request was send to the queue.
Expand Down Expand Up @@ -280,7 +280,7 @@ impl SnapshotRequestHandler {
let mut total_time = Measure::start("snapshot_request_receiver_total_time");
let SnapshotRequest {
snapshot_root_bank,
status_cache_slot_deltas,
seen_transaction_cache_slot_deltas,
request_kind,
enqueued: _,
} = snapshot_request;
Expand Down Expand Up @@ -367,7 +367,7 @@ impl SnapshotRequestHandler {
accounts_package_kind,
&snapshot_root_bank,
snapshot_storages,
status_cache_slot_deltas,
seen_transaction_cache_slot_deltas,
accounts_hash_for_testing,
),
AccountsPackageKind::EpochAccountsHash => panic!(
Expand Down Expand Up @@ -887,7 +887,7 @@ mod test {
let send_snapshot_request = |snapshot_root_bank, request_kind| {
let snapshot_request = SnapshotRequest {
snapshot_root_bank,
status_cache_slot_deltas: Vec::default(),
seen_transaction_cache_slot_deltas: Vec::default(),
request_kind,
enqueued: Instant::now(),
};
Expand Down
31 changes: 24 additions & 7 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ impl PartialEq for Bank {
let Self {
skipped_rewrites: _,
rc: _,
seen_transaction_cache: _,
status_cache: _,
blockhash_queue,
ancestors,
Expand Down Expand Up @@ -744,7 +745,10 @@ pub struct Bank {
/// References to accounts, parent and signature status
pub rc: BankRc,

/// A cache of signature statuses
/// A cache of seen transactions by message hash
pub seen_transaction_cache: Arc<RwLock<BankStatusCache>>,

/// A cache of transaction statuses by signature
pub status_cache: Arc<RwLock<BankStatusCache>>,

/// FIFO queue of `recent_blockhash` items
Expand Down Expand Up @@ -1082,6 +1086,7 @@ impl Bank {
let mut bank = Self {
skipped_rewrites: Mutex::default(),
rc: BankRc::new(accounts),
seen_transaction_cache: Arc::<RwLock<BankStatusCache>>::default(),
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
blockhash_queue: RwLock::<BlockhashQueue>::default(),
ancestors: Ancestors::default(),
Expand Down Expand Up @@ -1286,6 +1291,8 @@ impl Bank {
}
});

let (seen_transaction_cache, seen_transaction_cache_time_us) =
measure_us!(Arc::clone(&parent.seen_transaction_cache));
let (status_cache, status_cache_time_us) = measure_us!(Arc::clone(&parent.status_cache));

let (fee_rate_governor, fee_components_time_us) = measure_us!(
Expand Down Expand Up @@ -1320,6 +1327,7 @@ impl Bank {
let mut new = Self {
skipped_rewrites: Mutex::default(),
rc,
seen_transaction_cache,
status_cache,
slot,
bank_id,
Expand Down Expand Up @@ -1482,6 +1490,7 @@ impl Bank {
NewBankTimings {
bank_rc_creation_time_us,
total_elapsed_time_us: time.as_us(),
seen_transaction_cache_time_us,
status_cache_time_us,
fee_components_time_us,
blockhash_queue_time_us,
Expand Down Expand Up @@ -1815,6 +1824,7 @@ impl Bank {
let mut bank = Self {
skipped_rewrites: Mutex::default(),
rc: bank_rc,
seen_transaction_cache: Arc::<RwLock<BankStatusCache>>::default(),
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
blockhash_queue: RwLock::new(fields.blockhash_queue),
ancestors,
Expand Down Expand Up @@ -2092,8 +2102,8 @@ impl Bank {
self.freeze_started.load(Relaxed)
}

pub fn status_cache_ancestors(&self) -> Vec<u64> {
let mut roots = self.status_cache.read().unwrap().roots().clone();
pub fn seen_transaction_cache_ancestors(&self) -> Vec<u64> {
let mut roots = self.seen_transaction_cache.read().unwrap().roots().clone();
let min = roots.iter().min().cloned().unwrap_or(0);
for ancestor in self.ancestors.keys() {
if ancestor >= min {
Expand Down Expand Up @@ -2737,9 +2747,10 @@ impl Bank {
*self.rc.parent.write().unwrap() = None;

let mut squash_cache_time = Measure::start("squash_cache_time");
roots
.iter()
.for_each(|slot| self.status_cache.write().unwrap().add_root(*slot));
roots.iter().for_each(|slot| {
self.seen_transaction_cache.write().unwrap().add_root(*slot);
self.status_cache.write().unwrap().add_root(*slot);
});
squash_cache_time.stop();

SquashTiming {
Expand Down Expand Up @@ -3005,10 +3016,15 @@ impl Bank {

/// Forget all signatures. Useful for benchmarking.
pub fn clear_signatures(&self) {
self.seen_transaction_cache.write().unwrap().clear();
self.status_cache.write().unwrap().clear();
}

pub fn clear_slot_signatures(&self, slot: Slot) {
self.seen_transaction_cache
.write()
.unwrap()
.clear_slot_entries(slot);
self.status_cache.write().unwrap().clear_slot_entries(slot);
}

Expand All @@ -3017,13 +3033,14 @@ impl Bank {
sanitized_txs: &[impl TransactionWithMeta],
processing_results: &[TransactionProcessingResult],
) {
let mut seen_transaction_cache = self.seen_transaction_cache.write().unwrap();
let mut status_cache = self.status_cache.write().unwrap();
assert_eq!(sanitized_txs.len(), processing_results.len());
for (tx, processing_result) in sanitized_txs.iter().zip(processing_results) {
if let Ok(processed_tx) = &processing_result {
// Add the message hash to the status cache to ensure that this message
// won't be processed again with a different signature.
status_cache.insert(
seen_transaction_cache.insert(
tx.recent_blockhash(),
tx.message_hash(),
self.slot(),
Expand Down
12 changes: 6 additions & 6 deletions runtime/src/bank/check_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Bank {
max_age,
error_counters,
);
self.check_status_cache(sanitized_txs, lock_results, error_counters)
self.check_seen_transaction_cache(sanitized_txs, lock_results, error_counters)
}

fn check_age_and_compute_budget_limits<Tx: TransactionWithMeta>(
Expand Down Expand Up @@ -249,15 +249,15 @@ impl Bank {
Some((*nonce_address, nonce_account, nonce_data))
}

fn check_status_cache<Tx: TransactionWithMeta>(
fn check_seen_transaction_cache<Tx: TransactionWithMeta>(
&self,
sanitized_txs: &[impl core::borrow::Borrow<Tx>],
lock_results: Vec<TransactionCheckResult>,
error_counters: &mut TransactionErrorMetrics,
) -> Vec<TransactionCheckResult> {
// Do allocation before acquiring the lock on the status cache.
// Do allocation before acquiring the lock on the seen transaction cache.
let mut check_results = Vec::with_capacity(sanitized_txs.len());
let rcache = self.status_cache.read().unwrap();
let rcache = self.seen_transaction_cache.read().unwrap();

check_results.extend(sanitized_txs.iter().zip(lock_results).map(
|(sanitized_tx, lock_result)| {
Expand All @@ -278,11 +278,11 @@ impl Bank {
fn is_transaction_already_processed(
&self,
sanitized_tx: &impl TransactionWithMeta,
status_cache: &BankStatusCache,
seen_transaction_cache: &BankStatusCache,
) -> bool {
let key = sanitized_tx.message_hash();
let transaction_blockhash = sanitized_tx.recent_blockhash();
status_cache
seen_transaction_cache
.get_status(key, transaction_blockhash, &self.ancestors)
.is_some()
}
Expand Down
2 changes: 2 additions & 0 deletions runtime/src/bank/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(crate) struct RewardsMetrics {
pub(crate) struct NewBankTimings {
pub(crate) bank_rc_creation_time_us: u64,
pub(crate) total_elapsed_time_us: u64,
pub(crate) seen_transaction_cache_time_us: u64,
pub(crate) status_cache_time_us: u64,
pub(crate) fee_components_time_us: u64,
pub(crate) blockhash_queue_time_us: u64,
Expand Down Expand Up @@ -108,6 +109,7 @@ pub(crate) fn report_new_bank_metrics(
("parent_slot", parent_slot, i64),
("bank_rc_creation_us", timings.bank_rc_creation_time_us, i64),
("total_elapsed_us", timings.total_elapsed_time_us, i64),
("seen_transaction_cache_us", timings.seen_transaction_cache_time_us, i64),
("status_cache_us", timings.status_cache_time_us, i64),
("fee_components_us", timings.fee_components_time_us, i64),
("blockhash_queue_us", timings.blockhash_queue_time_us, i64),
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/bank/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4773,7 +4773,7 @@ fn test_get_filtered_indexed_accounts() {
}

#[test]
fn test_status_cache_ancestors() {
fn test_seen_transaction_cache_ancestors() {
solana_logger::setup();
let (parent, _bank_forks) = create_simple_test_arc_bank(500);
let bank1 = Arc::new(new_from_parent(parent));
Expand All @@ -4785,7 +4785,7 @@ fn test_status_cache_ancestors() {

let bank = new_from_parent(bank);
assert_eq!(
bank.status_cache_ancestors(),
bank.seen_transaction_cache_ancestors(),
(bank.slot() - MAX_CACHE_ENTRIES as u64..=bank.slot()).collect::<Vec<_>>()
);
}
Expand Down
Loading
Loading