Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2688,6 +2688,7 @@ impl Bank {
}

/// Forget all signatures. Useful for benchmarking.
#[cfg(feature = "dev-context-only-utils")]
pub fn clear_signatures(&self) {
self.status_cache.write().unwrap().clear();
}
Expand Down
127 changes: 79 additions & 48 deletions runtime/src/status_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,30 @@ use {
std::sync::{Arc, Mutex},
};

// The maximum number of entries to store in the cache. This is the same as the number of recent
// blockhashes because we automatically reject txs that use older blockhashes so we don't need to
// track those explicitly.
pub const MAX_CACHE_ENTRIES: usize = MAX_RECENT_BLOCKHASHES;

// Only store 20 bytes of the tx keys processed to save some memory.
const CACHED_KEY_SIZE: usize = 20;

// Store forks in a single chunk of memory to avoid another lookup.
// Store forks in a single chunk of memory to avoid another hash lookup.
pub type ForkStatus<T> = Vec<(Slot, T)>;

// The type of the key used in the cache.
pub(crate) type KeySlice = [u8; CACHED_KEY_SIZE];

type KeyMap<T> = HashMap<KeySlice, ForkStatus<T>>;

// Map of Hash and status
pub type Status<T> = Arc<Mutex<HashMap<Hash, (usize, Vec<(KeySlice, T)>)>>>;

// A Map of hash + the highest fork it's been observed on along with
// the key offset and a Map of the key slice + Fork status for that key
type KeyStatusMap<T> = HashMap<Hash, (Slot, usize, KeyMap<T>)>;

// A map of keys recorded in each fork; used to serialize for snapshots easily.
// Doesn't store a `SlotDelta` in it because the bool `root` is usually set much later
// The type used for StatusCache::slot_deltas. See the field definition for more details.
type SlotDeltaMap<T> = HashMap<Slot, Status<T>>;

// The statuses added during a slot, can be used to build on top of a status cache or to
Expand All @@ -39,9 +48,12 @@ pub type SlotDelta<T> = (Slot, bool, Status<T>);
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Clone, Debug)]
pub struct StatusCache<T: Serialize + Clone> {
// cache[blockhash][tx_key] => [(fork1_slot, tx_result), (fork2_slot, tx_result), ...] used to
// check if a tx_key was seen on a fork and for rpc to retrieve the tx_result
cache: KeyStatusMap<T>,
roots: HashSet<Slot>,
/// all keys seen during a fork/slot
// slot_deltas[slot][blockhash] => [(tx_key, tx_result), ...] used to serialize for snapshots
// and to rebuild cache[blockhash][tx_key] from a snapshot
slot_deltas: SlotDeltaMap<T>,
}

Expand All @@ -56,33 +68,15 @@ impl<T: Serialize + Clone> Default for StatusCache<T> {
}
}

impl<T: Serialize + Clone + PartialEq> PartialEq for StatusCache<T> {
fn eq(&self, other: &Self) -> bool {
self.roots == other.roots
&& self
.cache
.iter()
.all(|(hash, (slot, key_index, hash_map))| {
if let Some((other_slot, other_key_index, other_hash_map)) =
other.cache.get(hash)
{
if slot == other_slot && key_index == other_key_index {
return hash_map.iter().all(|(slice, fork_map)| {
if let Some(other_fork_map) = other_hash_map.get(slice) {
// all this work just to compare the highest forks in the fork map
// per entry
return fork_map.last() == other_fork_map.last();
}
false
});
}
}
false
})
}
}

impl<T: Serialize + Clone> StatusCache<T> {
/// Clear all entries for a slot.
///
/// This is used when a slot is purged from the cache, see
/// ReplayStage::purge_unconfirmed_duplicate_slot().
///
/// When this is called, it's guaranteed that there are no threads inserting new entries for
/// this slot. root_slot_deltas() also never accesses slots that are being cleared because roots
/// are never purged.
pub fn clear_slot_entries(&mut self, slot: Slot) {
let slot_deltas = self.slot_deltas.remove(&slot);
if let Some(slot_deltas) = slot_deltas {
Expand Down Expand Up @@ -146,9 +140,9 @@ impl<T: Serialize + Clone> StatusCache<T> {
None
}

/// Search for a key with any blockhash
/// Prefer get_status for performance reasons, it doesn't need
/// to search all blockhashes.
/// Search for a key with any blockhash.
///
/// Prefer get_status for performance reasons, it doesn't need to search all blockhashes.
pub fn get_status_any_blockhash<K: AsRef<[u8]>>(
&self,
key: K,
Expand All @@ -166,8 +160,10 @@ impl<T: Serialize + Clone> StatusCache<T> {
None
}

/// Add a known root fork. Roots are always valid ancestors.
/// After MAX_CACHE_ENTRIES, roots are removed, and any old keys are cleared.
/// Add a known root fork.
///
/// Roots are always valid ancestors. After MAX_CACHE_ENTRIES, roots are removed, and any old
/// keys are cleared.
pub fn add_root(&mut self, fork: Slot) {
self.roots.insert(fork);
self.purge_roots();
Expand All @@ -177,7 +173,7 @@ impl<T: Serialize + Clone> StatusCache<T> {
&self.roots
}

/// Insert a new key for a specific slot.
/// Insert a new key using the given blockhash at the given slot.
pub fn insert<K: AsRef<[u8]>>(
&mut self,
transaction_blockhash: &Hash,
Expand Down Expand Up @@ -224,7 +220,7 @@ impl<T: Serialize + Clone> StatusCache<T> {
}
}

/// Clear for testing
#[cfg(feature = "dev-context-only-utils")]
pub fn clear(&mut self) {
for v in self.cache.values_mut() {
v.2 = HashMap::new();
Expand All @@ -235,7 +231,13 @@ impl<T: Serialize + Clone> StatusCache<T> {
.for_each(|(_, status)| status.lock().unwrap().clear());
}

/// Get the statuses for all the root slots
/// Get the statuses for all the root slots.
///
/// This is never called concurrently with add_root(), and for a slot to be a root there must be
/// no new entries for that slot, so there are no races.
///
/// See ReplayStage::handle_new_root() => BankForks::set_root() =>
/// BankForks::do_set_root_return_metrics() => root_slot_deltas()
pub fn root_slot_deltas(&self) -> Vec<SlotDelta<T>> {
self.roots()
.iter()
Expand All @@ -249,7 +251,10 @@ impl<T: Serialize + Clone> StatusCache<T> {
.collect()
}

// replay deltas into a status_cache allows "appending" data
/// Populate the cache with the slot deltas from a snapshot.
///
/// Really badly named method. See load_bank_forks() => ... =>
/// rebuild_bank_from_snapshot() => [load slot deltas from snapshot] => append()
pub fn append(&mut self, slot_deltas: &[SlotDelta<T>]) {
for (slot, is_root, statuses) in slot_deltas {
statuses
Expand All @@ -267,13 +272,6 @@ impl<T: Serialize + Clone> StatusCache<T> {
}
}

pub fn from_slot_deltas(slot_deltas: &[SlotDelta<T>]) -> Self {
// play all deltas back into the status cache
let mut me = Self::default();
me.append(slot_deltas);
me
}

fn insert_with_slice(
&mut self,
transaction_blockhash: &Hash,
Expand All @@ -294,8 +292,7 @@ impl<T: Serialize + Clone> StatusCache<T> {
self.add_to_slot_delta(transaction_blockhash, slot, key_index, key_slice, res);
}

// Add this key slice to the list of key slices for this slot and blockhash
// combo.
// Add this key slice to the list of key slices for this slot and blockhash combo.
fn add_to_slot_delta(
&mut self,
transaction_blockhash: &Hash,
Expand All @@ -318,6 +315,40 @@ mod tests {

type BankStatusCache = StatusCache<()>;

impl<T: Serialize + Clone> StatusCache<T> {
fn from_slot_deltas(slot_deltas: &[SlotDelta<T>]) -> Self {
let mut cache = Self::default();
cache.append(slot_deltas);
cache
}
}

impl<T: Serialize + Clone + PartialEq> PartialEq for StatusCache<T> {
fn eq(&self, other: &Self) -> bool {
self.roots == other.roots
&& self
.cache
.iter()
.all(|(hash, (slot, key_index, hash_map))| {
if let Some((other_slot, other_key_index, other_hash_map)) =
other.cache.get(hash)
{
if slot == other_slot && key_index == other_key_index {
return hash_map.iter().all(|(slice, fork_map)| {
if let Some(other_fork_map) = other_hash_map.get(slice) {
// all this work just to compare the highest forks in the fork map
// per entry
return fork_map.last() == other_fork_map.last();
}
false
});
}
}
false
})
}
}

#[test]
fn test_empty_has_no_sigs() {
let sig = Signature::default();
Expand Down
Loading