Skip to content

Commit 2b91f0e

Browse files
committed
use state cache to avoid unnecessary block replay
1 parent 00cf5fc commit 2b91f0e

File tree

1 file changed

+44
-9
lines changed

1 file changed

+44
-9
lines changed

beacon_node/store/src/hot_cold_store.rs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ use std::sync::Arc;
4040
use std::time::Duration;
4141
use types::*;
4242

43-
/// On-disk database that stores finalized states efficiently.
43+
/// The number of replayed states to be cached.
44+
const STATE_CACHE_SIZE: usize = 32;
45+
46+
/// On-disk database that stores fnalized states efficiently.
4447
///
4548
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
4649
/// intermittent "restore point" states pre-finalization.
@@ -62,6 +65,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
6265
pub hot_db: Hot,
6366
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
6467
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
68+
/// LRU cache of replayed states.
69+
state_cache: Mutex<LruCache<Slot, BeaconState<E>>>,
6570
/// Chain spec.
6671
pub(crate) spec: ChainSpec,
6772
/// Logger.
@@ -129,6 +134,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
129134
cold_db: MemoryStore::open(),
130135
hot_db: MemoryStore::open(),
131136
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
137+
state_cache: Mutex::new(LruCache::new(STATE_CACHE_SIZE)),
132138
config,
133139
spec,
134140
log,
@@ -162,6 +168,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
162168
cold_db: LevelDB::open(cold_path)?,
163169
hot_db: LevelDB::open(hot_path)?,
164170
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
171+
state_cache: Mutex::new(LruCache::new(STATE_CACHE_SIZE)),
165172
config,
166173
spec,
167174
log,
@@ -579,6 +586,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
579586
/// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint
580587
/// (which will be deleted by this function but shouldn't be).
581588
pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
589+
// Delete the state from the cache.
590+
self.state_cache.lock().pop(&slot);
591+
582592
// Delete the state summary.
583593
self.hot_db
584594
.key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?;
@@ -977,40 +987,65 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
977987

978988
/// Load a frozen state that lies between restore points.
979989
fn load_cold_intermediate_state(&self, slot: Slot) -> Result<BeaconState<E>, Error> {
990+
if let Some(state) = self.state_cache.lock().get(&slot) {
991+
return Ok(state.clone());
992+
}
993+
980994
// 1. Load the restore points either side of the intermediate state.
981995
let low_restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point;
982996
let high_restore_point_idx = low_restore_point_idx + 1;
983997

998+
// Use low restore point as the base state.
999+
let mut low_slot: Slot =
1000+
Slot::new(low_restore_point_idx * self.config.slots_per_restore_point);
1001+
let mut low_state: BeaconState<E> =
1002+
self.load_restore_point_by_index(low_restore_point_idx)?;
1003+
1004+
// Try to get a more recent state from the cache to avoid massive blocks replay.
1005+
for (s, state) in self.state_cache.lock().iter() {
1006+
if s.as_u64() / self.config.slots_per_restore_point == low_restore_point_idx
1007+
&& *s < slot
1008+
&& low_slot < *s
1009+
{
1010+
low_slot = *s;
1011+
low_state = state.clone();
1012+
}
1013+
}
1014+
9841015
// Acquire the read lock, so that the split can't change while this is happening.
9851016
let split = self.split.read_recursive();
9861017

987-
let low_restore_point = self.load_restore_point_by_index(low_restore_point_idx)?;
9881018
let high_restore_point = self.get_restore_point(high_restore_point_idx, &split)?;
9891019

990-
// 2. Load the blocks from the high restore point back to the low restore point.
1020+
// 2. Load the blocks from the high restore point back to the low point.
9911021
let blocks = self.load_blocks_to_replay(
992-
low_restore_point.slot(),
1022+
low_slot,
9931023
slot,
9941024
self.get_high_restore_point_block_root(&high_restore_point, slot)?,
9951025
)?;
9961026

997-
// 3. Replay the blocks on top of the low restore point.
1027+
// 3. Replay the blocks on top of the low point.
9981028
// Use a forwards state root iterator to avoid doing any tree hashing.
9991029
// The state root of the high restore point should never be used, so is safely set to 0.
10001030
let state_root_iter = self.forwards_state_roots_iterator_until(
1001-
low_restore_point.slot(),
1031+
low_slot,
10021032
slot,
10031033
|| (high_restore_point, Hash256::zero()),
10041034
&self.spec,
10051035
)?;
10061036

1007-
self.replay_blocks(
1008-
low_restore_point,
1037+
let state = self.replay_blocks(
1038+
low_state.clone(),
10091039
blocks,
10101040
slot,
10111041
Some(state_root_iter),
10121042
StateRootStrategy::Accurate,
1013-
)
1043+
)?;
1044+
1045+
// If state is not error, put it in the cache.
1046+
self.state_cache.lock().put(slot, state.clone());
1047+
1048+
Ok(state)
10141049
}
10151050

10161051
/// Get the restore point with the given index, or if it is out of bounds, the split state.

0 commit comments

Comments
 (0)