Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ pub fn cli_app() -> Command {
.long("block-cache-size")
.value_name("SIZE")
.help("Specifies how many blocks the database should cache in memory")
.default_value("5")
.default_value("0")
.action(ArgAction::Set)
.display_order(0)
)
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::LevelDb;
pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8;
pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(64);
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 0;
pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128);
pub const DEFAULT_STATE_CACHE_HEADROOM: NonZeroUsize = new_non_zero_usize(1);
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
Expand All @@ -34,7 +34,7 @@ pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StoreConfig {
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: NonZeroUsize,
pub block_cache_size: usize,
/// Maximum number of states to store in the in-memory state cache.
pub state_cache_size: NonZeroUsize,
/// Minimum number of states to cull from the state cache upon fullness.
Expand Down
194 changes: 115 additions & 79 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The hot database also contains all blocks.
pub hot_db: Hot,
/// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded.
block_cache: Mutex<BlockCache<E>>,
block_cache: Option<Mutex<BlockCache<E>>>,
/// Cache of beacon states.
///
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
Expand Down Expand Up @@ -229,7 +229,9 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
cold_db: MemoryStore::open(),
blobs_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
block_cache: NonZeroUsize::new(config.block_cache_size)
.map(BlockCache::new)
.map(Mutex::new),
state_cache: Mutex::new(StateCache::new(
config.state_cache_size,
config.state_cache_headroom,
Expand Down Expand Up @@ -281,7 +283,9 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?,
cold_db: BeaconNodeBackend::open(&config, cold_path)?,
hot_db,
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
block_cache: NonZeroUsize::new(config.block_cache_size)
.map(BlockCache::new)
.map(Mutex::new),
state_cache: Mutex::new(StateCache::new(
config.state_cache_size,
config.state_cache_headroom,
Expand Down Expand Up @@ -488,14 +492,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn register_metrics(&self) {
let hsc_metrics = self.historic_state_cache.lock().metrics();

metrics::set_gauge(
&metrics::STORE_BEACON_BLOCK_CACHE_SIZE,
self.block_cache.lock().block_cache.len() as i64,
);
metrics::set_gauge(
&metrics::STORE_BEACON_BLOB_CACHE_SIZE,
self.block_cache.lock().blob_cache.len() as i64,
);
if let Some(block_cache) = &self.block_cache {
let cache = block_cache.lock();
metrics::set_gauge(
&metrics::STORE_BEACON_BLOCK_CACHE_SIZE,
cache.block_cache.len() as i64,
);
metrics::set_gauge(
&metrics::STORE_BEACON_BLOB_CACHE_SIZE,
cache.blob_cache.len() as i64,
);
}
let state_cache = self.state_cache.lock();
metrics::set_gauge(
&metrics::STORE_BEACON_STATE_CACHE_SIZE,
Expand Down Expand Up @@ -553,7 +560,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let block = self.block_as_kv_store_ops(block_root, block, &mut ops)?;
self.hot_db.do_atomically(ops)?;
// Update cache.
self.block_cache.lock().put_block(*block_root, block);
self.block_cache
.as_ref()
.inspect(|cache| cache.lock().put_block(*block_root, block));
Ok(())
}

Expand Down Expand Up @@ -605,7 +614,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);

// Check the cache.
if let Some(block) = self.block_cache.lock().get_block(block_root) {
if let Some(cache) = &self.block_cache
&& let Some(block) = cache.lock().get_block(block_root)
{
metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT);
return Ok(Some(DatabaseBlock::Full(block.clone())));
}
Expand All @@ -630,8 +641,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

// Add to cache.
self.block_cache
.lock()
.put_block(*block_root, full_block.clone());
.as_ref()
.inspect(|cache| cache.lock().put_block(*block_root, full_block.clone()));

DatabaseBlock::Full(full_block)
} else if !self.config.prune_payloads {
Expand Down Expand Up @@ -902,7 +913,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

/// Delete a block from the store and the block cache.
pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
self.block_cache.lock().delete(block_root);
self.block_cache
.as_ref()
.inspect(|cache| cache.lock().delete(block_root));
self.hot_db
.key_delete(DBColumn::BeaconBlock, block_root.as_slice())?;
self.hot_db
Expand All @@ -917,7 +930,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
block_root.as_slice(),
&blobs.as_ssz_bytes(),
)?;
self.block_cache.lock().put_blobs(*block_root, blobs);
self.block_cache
.as_ref()
.inspect(|cache| cache.lock().put_blobs(*block_root, blobs));
Ok(())
}

Expand Down Expand Up @@ -945,9 +960,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.blobs_db
.put(&DATA_COLUMN_CUSTODY_INFO_KEY, &data_column_custody_info)?;

self.block_cache
.lock()
.put_data_column_custody_info(Some(data_column_custody_info));
self.block_cache.as_ref().inspect(|cache| {
cache
.lock()
.put_data_column_custody_info(Some(data_column_custody_info))
});

Ok(())
}
Expand All @@ -964,8 +981,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&data_column.as_ssz_bytes(),
)?;
self.block_cache
.lock()
.put_data_column(*block_root, data_column);
.as_ref()
.inspect(|cache| cache.lock().put_data_column(*block_root, data_column));
}
Ok(())
}
Expand Down Expand Up @@ -1399,7 +1416,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

// Update database whilst holding a lock on cache, to ensure that the cache updates
// atomically with the database.
let mut guard = self.block_cache.lock();
let guard = self.block_cache.as_ref().map(|cache| cache.lock());

let blob_cache_ops = blobs_ops.clone();
// Try to execute blobs store ops.
Expand Down Expand Up @@ -1446,57 +1463,68 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Err(e);
}

for op in hot_db_cache_ops {
// Delete from the state cache.
for op in &hot_db_cache_ops {
match op {
StoreOp::PutBlock(block_root, block) => {
guard.put_block(block_root, (*block).clone());
StoreOp::DeleteBlock(block_root) => {
self.state_cache.lock().delete_block_states(block_root);
}
StoreOp::DeleteState(state_root, _) => {
self.state_cache.lock().delete_state(state_root)
}
_ => (),
}
}

StoreOp::PutBlobs(_, _) => (),
// If the block cache is enabled, also delete from the block cache.
if let Some(mut guard) = guard {
for op in hot_db_cache_ops {
match op {
StoreOp::PutBlock(block_root, block) => {
guard.put_block(block_root, (*block).clone());
}

StoreOp::PutDataColumns(_, _) => (),
StoreOp::PutBlobs(_, _) => (),

StoreOp::PutState(_, _) => (),
StoreOp::PutDataColumns(_, _) => (),

StoreOp::PutStateSummary(_, _) => (),
StoreOp::PutState(_, _) => (),

StoreOp::DeleteBlock(block_root) => {
guard.delete_block(&block_root);
self.state_cache.lock().delete_block_states(&block_root);
}
StoreOp::PutStateSummary(_, _) => (),

StoreOp::DeleteState(state_root, _) => {
self.state_cache.lock().delete_state(&state_root)
}
StoreOp::DeleteBlock(block_root) => {
guard.delete_block(&block_root);
}

StoreOp::DeleteBlobs(_) => (),
StoreOp::DeleteState(_, _) => (),

StoreOp::DeleteDataColumns(_, _) => (),
StoreOp::DeleteBlobs(_) => (),

StoreOp::DeleteExecutionPayload(_) => (),
StoreOp::DeleteDataColumns(_, _) => (),

StoreOp::DeleteSyncCommitteeBranch(_) => (),
StoreOp::DeleteExecutionPayload(_) => (),

StoreOp::KeyValueOp(_) => (),
}
}
StoreOp::DeleteSyncCommitteeBranch(_) => (),

for op in blob_cache_ops {
match op {
StoreOp::PutBlobs(block_root, blobs) => {
guard.put_blobs(block_root, blobs);
StoreOp::KeyValueOp(_) => (),
}
}

StoreOp::DeleteBlobs(block_root) => {
guard.delete_blobs(&block_root);
}
for op in blob_cache_ops {
match op {
StoreOp::PutBlobs(block_root, blobs) => {
guard.put_blobs(block_root, blobs);
}

_ => (),
StoreOp::DeleteBlobs(block_root) => {
guard.delete_blobs(&block_root);
}

_ => (),
}
}
}

drop(guard);

Ok(())
}

Expand Down Expand Up @@ -2425,21 +2453,23 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// If custody info doesn't exist in the cache,
/// try to fetch from the DB and prime the cache.
pub fn get_data_column_custody_info(&self) -> Result<Option<DataColumnCustodyInfo>, Error> {
let Some(data_column_custody_info) = self.block_cache.lock().get_data_column_custody_info()
else {
let data_column_custody_info = self
.blobs_db
.get::<DataColumnCustodyInfo>(&DATA_COLUMN_CUSTODY_INFO_KEY)?;
if let Some(cache) = &self.block_cache
&& let Some(data_column_custody_info) = cache.lock().get_data_column_custody_info()
{
return Ok(Some(data_column_custody_info));
}
let data_column_custody_info = self
.blobs_db
.get::<DataColumnCustodyInfo>(&DATA_COLUMN_CUSTODY_INFO_KEY)?;

// Update the cache
self.block_cache
// Update the cache
self.block_cache.as_ref().inspect(|cache| {
cache
.lock()
.put_data_column_custody_info(data_column_custody_info.clone());

return Ok(data_column_custody_info);
};
.put_data_column_custody_info(data_column_custody_info.clone())
});

Ok(Some(data_column_custody_info))
Ok(data_column_custody_info)
}

/// Fetch all columns for a given block from the store.
Expand All @@ -2460,9 +2490,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Fetch blobs for a given block from the store.
pub fn get_blobs(&self, block_root: &Hash256) -> Result<BlobSidecarListFromRoot<E>, Error> {
// Check the cache.
if let Some(blobs) = self.block_cache.lock().get_blobs(block_root) {
if let Some(blobs) = self
.block_cache
.as_ref()
.and_then(|cache| cache.lock().get_blobs(block_root).cloned())
{
metrics::inc_counter(&metrics::BEACON_BLOBS_CACHE_HIT_COUNT);
return Ok(blobs.clone().into());
return Ok(blobs.into());
}

match self
Expand All @@ -2481,8 +2515,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
{
let blobs = BlobSidecarList::new(blobs, max_blobs_per_block as usize)?;
self.block_cache
.lock()
.put_blobs(*block_root, blobs.clone());
.as_ref()
.inspect(|cache| cache.lock().put_blobs(*block_root, blobs.clone()));

Ok(BlobSidecarListFromRoot::Blobs(blobs))
} else {
Expand Down Expand Up @@ -2515,8 +2549,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// Check the cache.
if let Some(data_column) = self
.block_cache
.lock()
.get_data_column(block_root, column_index)
.as_ref()
.and_then(|cache| cache.lock().get_data_column(block_root, column_index))
{
metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT);
return Ok(Some(data_column));
Expand All @@ -2528,9 +2562,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
)? {
Some(ref data_column_bytes) => {
let data_column = Arc::new(DataColumnSidecar::from_ssz_bytes(data_column_bytes)?);
self.block_cache
.lock()
.put_data_column(*block_root, data_column.clone());
self.block_cache.as_ref().inspect(|cache| {
cache
.lock()
.put_data_column(*block_root, data_column.clone())
});
Ok(Some(data_column))
}
None => Ok(None),
Expand Down Expand Up @@ -3264,11 +3300,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

// Remove deleted blobs from the cache.
let mut block_cache = self.block_cache.lock();
for block_root in removed_block_roots {
block_cache.delete_blobs(&block_root);
if let Some(mut block_cache) = self.block_cache.as_ref().map(|cache| cache.lock()) {
for block_root in removed_block_roots {
block_cache.delete_blobs(&block_root);
}
}
drop(block_cache);

let new_blob_info = BlobInfo {
oldest_blob_slot: Some(end_slot + 1),
Expand Down
2 changes: 1 addition & 1 deletion book/src/help_bn.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Options:
Data directory for the blobs database.
--block-cache-size <SIZE>
Specifies how many blocks the database should cache in memory
[default: 5]
[default: 0]
--boot-nodes <ENR/MULTIADDR LIST>
One or more comma-delimited base64-encoded ENR's to bootstrap the p2p
network. Multiaddr is also supported.
Expand Down
Loading
Loading