From b879b6766725db96ffef7e3672b481bb0cc3f8b9 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 24 Oct 2023 10:38:53 +1100 Subject: [PATCH 01/12] Add PromiseCache --- Cargo.lock | 8 ++ Cargo.toml | 1 + common/promise_cache/Cargo.toml | 8 ++ common/promise_cache/src/lib.rs | 159 ++++++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+) create mode 100644 common/promise_cache/Cargo.toml create mode 100644 common/promise_cache/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 54e8523f1f4..43271a9f1f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6024,6 +6024,14 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "promise_cache" +version = "0.1.0" +dependencies = [ + "derivative", + "oneshot_broadcast", +] + [[package]] name = "proto_array" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 49411340bd4..8310aba5848 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ members = [ "common/malloc_utils", "common/oneshot_broadcast", "common/pretty_reqwest_error", + "common/promise_cache", "common/sensitive_url", "common/slot_clock", "common/system_health", diff --git a/common/promise_cache/Cargo.toml b/common/promise_cache/Cargo.toml new file mode 100644 index 00000000000..3ff5e324e0d --- /dev/null +++ b/common/promise_cache/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "promise_cache" +version = "0.1.0" +edition.workspace = true + +[dependencies] +derivative = { workspace = true } +oneshot_broadcast = { path = "../oneshot_broadcast" } diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs new file mode 100644 index 00000000000..99c66fb24ca --- /dev/null +++ b/common/promise_cache/src/lib.rs @@ -0,0 +1,159 @@ +use derivative::Derivative; +use oneshot_broadcast::{oneshot, Receiver, Sender}; +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; + +#[derive(Derivative)] +#[derivative(Clone(bound = ""))] +pub enum CacheItem { + Complete(Arc), + Promise(Receiver>), +} + +pub enum PromiseCacheError { + Failed(oneshot_broadcast::Error), + MaxConcurrentPromises(usize), +} + +impl CacheItem { + pub fn is_promise(&self) -> bool { + matches!(self, CacheItem::Promise(_)) + } + + pub fn wait(self) -> Result, PromiseCacheError> { + match self { + CacheItem::Complete(value) => Ok(value), + CacheItem::Promise(receiver) => receiver.recv().map_err(PromiseCacheError::Failed), + } + } +} + +pub trait ToArc { + fn to_arc(&self) -> Arc; +} + +impl ToArc for Arc { + fn to_arc(&self) -> Arc { + self.clone() + } +} + +impl ToArc for T +where + T: Clone, +{ + fn to_arc(&self) -> Arc { + Arc::new(self.clone()) + } +} + +pub struct PromiseCache +where + K: Hash + Eq + Clone, +{ + cache: HashMap>, + capacity: usize, + max_concurrent_promises: usize, +} + +impl PromiseCache +where + K: Hash + Eq + Clone, +{ + pub fn new(capacity: usize, max_concurrent_promises: usize) -> Self { + Self { + cache: HashMap::new(), + capacity, + max_concurrent_promises, + } + } + + pub fn get(&mut self, key: &K) -> Option> { + match self.cache.get(key) { + // The cache contained the value, return it. + item @ Some(CacheItem::Complete(_)) => item.cloned(), + // The cache contains a promise for the value. Check to see if the promise has already + // been resolved, without waiting for it. + item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { + // The promise has already been resolved. Replace the entry in the cache with a + // `Complete` entry and then return the value. + Ok(Some(value)) => { + let ready = CacheItem::Complete(value); + self.insert_cache_item(key.clone(), ready.clone()); + Some(ready) + } + // The promise has not yet been resolved. Return the promise so the caller can await + // it. + Ok(None) => item.cloned(), + // The sender has been dropped without sending a value. There was most likely an + // error computing the value. Drop the key from the cache and return + // `None` so the caller can recompute the value. + // + // It's worth noting that this is the only place where we removed unresolved + // promises from the cache. This means unresolved promises will only be removed if + // we try to access them again. This is OK, since the promises don't consume much + // memory. We expect that *all* promises should be resolved, unless there is a + // programming or database error. + Err(oneshot_broadcast::Error::SenderDropped) => { + self.cache.remove(key); + None + } + }, + // The cache does not have this value and it's not already promised to be computed. + None => None, + } + } + + pub fn contains(&self, key: &K) -> bool { + self.cache.contains_key(key) + } + + pub fn insert_value>(&mut self, key: K, value: &C) { + if self + .cache + .get(&key) + // Replace the value if it's not present or if it's a promise. A bird in the hand is + // worth two in the promise-bush! + .map_or(true, CacheItem::is_promise) + { + self.insert_cache_item(key, CacheItem::Complete(value.to_arc())); + } + } + + /// Prunes the cache first before inserting a new item. + fn insert_cache_item(&mut self, key: K, cache_item: CacheItem) { + self.prune_cache(); + self.cache.insert(key, cache_item); + } + + pub fn create_promise(&mut self, key: K) -> Result>, PromiseCacheError> { + let num_active_promises = self.cache.values().filter(|item| item.is_promise()).count(); + if num_active_promises >= self.max_concurrent_promises { + return Err(PromiseCacheError::MaxConcurrentPromises( + num_active_promises, + )); + } + + let (sender, receiver) = oneshot(); + self.insert_cache_item(key, CacheItem::Promise(receiver)); + Ok(sender) + } + + fn prune_cache(&mut self) { + let target_cache_size = self.capacity.saturating_sub(1); + if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) { + // FIXME(sproul): implement type-specific pruning + let keys_to_prune = self + .cache + .keys() + .take(prune_count) + .cloned() + .collect::>(); + + for key in &keys_to_prune { + self.cache.remove(key); + } + } + } +} From 304aaa98058ce261ecdbfe5c412b82c2cdb07943 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 25 Oct 2023 17:55:40 +1100 Subject: [PATCH 02/12] Use promise cache in HTTP API --- Cargo.lock | 1 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 9 ++++ beacon_node/beacon_chain/src/builder.rs | 3 ++ beacon_node/http_api/src/state_id.rs | 48 ++++++++++++++++++++ common/promise_cache/src/lib.rs | 1 + 6 files changed, 63 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 43271a9f1f2..d4eead100eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -529,6 +529,7 @@ dependencies = [ "oneshot_broadcast", "operation_pool", "parking_lot 0.12.1", + "promise_cache", "proto_array", "rand 0.8.5", "rayon", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 8420e4410f4..b7c3482146a 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -45,6 +45,7 @@ merkle_proof = { workspace = true } oneshot_broadcast = { path = "../../common/oneshot_broadcast/" } operation_pool = { workspace = true } parking_lot = { workspace = true } +promise_cache = { path = "../../common/promise_cache" } proto_array = { workspace = true } rand = { workspace = true } rayon = { workspace = true } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 41f609eb86a..c0b0d4914f2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -84,6 +84,7 @@ use itertools::Itertools; use kzg::Kzg; use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella}; use parking_lot::{Mutex, RwLock}; +use promise_cache::PromiseCache; use proto_array::{DoNotReOrg, ProposerHeadError}; use safe_arith::SafeArith; use slasher::Slasher; @@ -458,6 +459,10 @@ pub struct BeaconChain { pub block_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, + /// A cache used to de-duplicate HTTP state requests. + /// + /// The cache is keyed by `state_root`. + pub http_state_cache: Arc>>>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, @@ -6316,6 +6321,10 @@ impl BeaconChain { pub fn data_availability_boundary(&self) -> Option { self.data_availability_checker.data_availability_boundary() } + + pub fn logger(&self) -> &Logger { + &self.log + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index c58c8bb0d2f..69365e755ba 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -22,6 +22,7 @@ use futures::channel::mpsc::Sender; use kzg::{Kzg, TrustedSetup}; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; +use promise_cache::PromiseCache; use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold}; use slasher::Slasher; use slog::{crit, debug, error, info, Logger}; @@ -946,6 +947,8 @@ where beacon_proposer_cache: <_>::default(), block_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), + // FIXME(sproul): make configurable + http_state_cache: Arc::new(RwLock::new(PromiseCache::new(3, 8))), validator_pubkey_cache, attester_cache: <_>::default(), early_attester_cache: <_>::default(), diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index fdc99fa954e..57f75c91726 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -1,8 +1,10 @@ use crate::ExecutionOptimistic; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::StateId as CoreStateId; +use slog::{info, warn}; use std::fmt; use std::str::FromStr; +use std::sync::Arc; use types::{BeaconState, Checkpoint, EthSpec, Fork, Hash256, Slot}; /// Wraps `eth2::types::StateId` and provides common state-access functionality. E.g., reading @@ -187,6 +189,49 @@ impl StateId { _ => (self.root(chain)?, None), }; + let mut opt_state_cache = Some(chain.http_state_cache.write()); + + // Try the cache. + if let Some(cache_item) = opt_state_cache + .as_mut() + .and_then(|cache| cache.get(&state_root)) + { + drop(opt_state_cache.take()); + match cache_item.wait() { + Ok(state) => { + info!( + chain.logger(), + "HTTP state cache hit"; + "state_root" => ?state_root, + "slot" => state.slot(), + ); + return Ok(((*state).clone(), execution_optimistic, finalized)); + } + Err(e) => { + warn!( + chain.logger(), + "State promise failed"; + "state_root" => ?state_root, + "outcome" => "re-computing", + "error" => ?e, + ); + } + } + } + + // Re-lock only in case of failed promise. + warn!( + chain.logger(), + "HTTP state cache miss"; + "state_root" => ?state_root + ); + let mut state_cache = opt_state_cache.unwrap_or_else(|| chain.http_state_cache.write()); + + let sender = state_cache.create_promise(state_root).map_err(|e| { + warp_utils::reject::custom_server_error(format!("too many concurrent requests: {e:?}")) + })?; + drop(state_cache); + let state = chain .get_state(&state_root, slot_opt) .map_err(warp_utils::reject::beacon_chain_error) @@ -199,6 +244,9 @@ impl StateId { }) })?; + // Fulfil promise. + sender.send(Arc::new(state.clone())); + Ok((state, execution_optimistic, finalized)) } diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index 99c66fb24ca..a3646bc8a61 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -11,6 +11,7 @@ pub enum CacheItem { Promise(Receiver>), } +#[derive(Debug)] pub enum PromiseCacheError { Failed(oneshot_broadcast::Error), MaxConcurrentPromises(usize), From 68b33f2dbabf5a9c1d0ea2353686e74e757779c3 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 10 Jan 2024 17:46:54 +1100 Subject: [PATCH 03/12] Use promise cache as shuffling cache --- Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 14 +- beacon_node/beacon_chain/src/builder.rs | 5 +- .../beacon_chain/src/canonical_head.rs | 4 +- beacon_node/beacon_chain/src/errors.rs | 4 +- .../beacon_chain/src/http_state_cache.rs | 22 ++ beacon_node/beacon_chain/src/lib.rs | 1 + .../beacon_chain/src/shuffling_cache.rs | 224 ++---------------- .../beacon_chain/src/state_advance_timer.rs | 2 +- beacon_node/http_api/src/lib.rs | 2 +- .../gossip_methods.rs | 2 +- common/promise_cache/Cargo.toml | 1 + common/promise_cache/src/lib.rs | 53 +++-- 13 files changed, 98 insertions(+), 237 deletions(-) create mode 100644 beacon_node/beacon_chain/src/http_state_cache.rs diff --git a/Cargo.lock b/Cargo.lock index d4eead100eb..2c5e07d8751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6030,6 +6030,7 @@ name = "promise_cache" version = "0.1.0" dependencies = [ "derivative", + "itertools", "oneshot_broadcast", ] diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c0b0d4914f2..203d6972882 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -32,6 +32,7 @@ use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, Prep use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; +use crate::http_state_cache::HttpStateCache; use crate::light_client_finality_update_verification::{ Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate, }; @@ -84,7 +85,6 @@ use itertools::Itertools; use kzg::Kzg; use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella}; use parking_lot::{Mutex, RwLock}; -use promise_cache::PromiseCache; use proto_array::{DoNotReOrg, ProposerHeadError}; use safe_arith::SafeArith; use slasher::Slasher; @@ -462,7 +462,7 @@ pub struct BeaconChain { /// A cache used to de-duplicate HTTP state requests. /// /// The cache is keyed by `state_root`. - pub http_state_cache: Arc>>>, + pub http_state_cache: Arc>>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, @@ -3787,7 +3787,7 @@ impl BeaconChain { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert_committee_cache(shuffling_id, committee_cache); + .insert_value(shuffling_id, committee_cache); } } Ok(()) @@ -5917,7 +5917,7 @@ impl BeaconChain { // access. drop(shuffling_cache); - let committee_cache = cache_item.wait()?; + let committee_cache = cache_item.wait().map_err(Error::ShufflingCacheError)?; map_fn(&committee_cache, shuffling_id.shuffling_decision_block) } else { // Create an entry in the cache that "promises" this value will eventually be computed. @@ -5926,7 +5926,9 @@ impl BeaconChain { // // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same // promise from being created twice. - let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + let sender = shuffling_cache + .create_promise(shuffling_id.clone()) + .map_err(Error::ShufflingCacheError)?; // Drop the shuffling cache to avoid holding the lock for any longer than // required. @@ -6020,7 +6022,7 @@ impl BeaconChain { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(Error::AttestationCacheLockTimeout)? - .insert_committee_cache(shuffling_id, &committee_cache); + .insert_value(shuffling_id, &committee_cache); metrics::stop_timer(committee_building_timer); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 69365e755ba..a397b125f2d 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -938,17 +938,18 @@ where fork_choice_signal_rx, event_handler: self.event_handler, head_tracker, + // FIXME(sproul): make configurable shuffling_cache: TimeoutRwLock::new(ShufflingCache::new( shuffling_cache_size, head_shuffling_ids, - log.clone(), + 2, )), eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())), beacon_proposer_cache: <_>::default(), block_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), // FIXME(sproul): make configurable - http_state_cache: Arc::new(RwLock::new(PromiseCache::new(3, 8))), + http_state_cache: Arc::new(RwLock::new(PromiseCache::new(3, Default::default(), 8))), validator_pubkey_cache, attester_cache: <_>::default(), early_attester_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 12980d6a802..f886cc20cae 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -823,9 +823,7 @@ impl BeaconChain { Ok(head_shuffling_ids) => { self.shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .map(|mut shuffling_cache| { - shuffling_cache.update_head_shuffling_ids(head_shuffling_ids) - }) + .map(|mut shuffling_cache| shuffling_cache.update_protector(head_shuffling_ids)) .unwrap_or_else(|| { error!( self.log, diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 1059bb42f87..ee79ec43bae 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -211,8 +211,8 @@ pub enum BeaconChainError { }, AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, - CommitteePromiseFailed(oneshot_broadcast::Error), - MaxCommitteePromises(usize), + ShufflingCacheError(promise_cache::PromiseCacheError), + HttpCacheError(promise_cache::PromiseCacheError), BlsToExecutionPriorToCapella, BlsToExecutionConflictsWithPool, InconsistentFork(InconsistentFork), diff --git a/beacon_node/beacon_chain/src/http_state_cache.rs b/beacon_node/beacon_chain/src/http_state_cache.rs new file mode 100644 index 00000000000..5413119144c --- /dev/null +++ b/beacon_node/beacon_chain/src/http_state_cache.rs @@ -0,0 +1,22 @@ +use promise_cache::{PromiseCache, Protect}; +use types::{BeaconState, EthSpec, Hash256}; + +#[derive(Default)] +pub struct HttpProtector; + +impl Protect for HttpProtector { + type SortKey = Hash256; + + /// We don't care too much about preventing evictions of particular states here. All the states + /// in this cache should be different from the head state. + fn protect_from_eviction(&self, _: &Hash256) -> bool { + false + } + + /// Evict in arbitrary (hash) order. + fn sort_key(&self, k: &Hash256) -> Self::SortKey { + *k + } +} + +pub type HttpStateCache = PromiseCache, HttpProtector>; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 5c79b77b0af..a0974934d9b 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -27,6 +27,7 @@ pub mod fork_choice_signal; pub mod fork_revert; mod head_tracker; pub mod historical_blocks; +mod http_state_cache; pub mod kzg_utils; pub mod light_client_finality_update_verification; pub mod light_client_optimistic_update_verification; diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 956520cdcfc..d0f3abfdb8d 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,17 +1,15 @@ -use std::collections::HashMap; -use std::sync::Arc; - +use crate::{metrics, BeaconChainError}; use itertools::Itertools; -use slog::{debug, Logger}; - use oneshot_broadcast::{oneshot, Receiver, Sender}; +use promise_cache::{PromiseCache, Protect}; +use slog::{debug, Logger}; +use std::collections::HashMap; +use std::sync::Arc; use types::{ beacon_state::CommitteeCache, AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch, }; -use crate::{metrics, BeaconChainError}; - /// The size of the cache that stores committee caches for quicker verification. /// /// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + @@ -28,210 +26,22 @@ pub const DEFAULT_CACHE_SIZE: usize = 16; /// always be inserted during block import. Unstable networks with a high degree of forking might /// see some attestations dropped due to this concurrency limit, however I propose that this is /// better than low-resource nodes going OOM. -const MAX_CONCURRENT_PROMISES: usize = 2; - -#[derive(Clone)] -pub enum CacheItem { - /// A committee. - Committee(Arc), - /// A promise for a future committee. - Promise(Receiver>), -} - -impl CacheItem { - pub fn is_promise(&self) -> bool { - matches!(self, CacheItem::Promise(_)) - } - - pub fn wait(self) -> Result, BeaconChainError> { - match self { - CacheItem::Committee(cache) => Ok(cache), - CacheItem::Promise(receiver) => receiver - .recv() - .map_err(BeaconChainError::CommitteePromiseFailed), - } - } -} - -/// Provides a cache for `CommitteeCache`. -/// -/// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like -/// a find/replace error. -pub struct ShufflingCache { - cache: HashMap, - cache_size: usize, - head_shuffling_ids: BlockShufflingIds, - logger: Logger, -} - -impl ShufflingCache { - pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds, logger: Logger) -> Self { - Self { - cache: HashMap::new(), - cache_size, - head_shuffling_ids, - logger, - } - } - - pub fn get(&mut self, key: &AttestationShufflingId) -> Option { - match self.cache.get(key) { - // The cache contained the committee cache, return it. - item @ Some(CacheItem::Committee(_)) => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - item.cloned() - } - // The cache contains a promise for the committee cache. Check to see if the promise has - // already been resolved, without waiting for it. - item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { - // The promise has already been resolved. Replace the entry in the cache with a - // `Committee` entry and then return the committee. - Ok(Some(committee)) => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - let ready = CacheItem::Committee(committee); - self.insert_cache_item(key.clone(), ready.clone()); - Some(ready) - } - // The promise has not yet been resolved. Return the promise so the caller can await - // it. - Ok(None) => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); - metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - item.cloned() - } - // The sender has been dropped without sending a committee. There was most likely an - // error computing the committee cache. Drop the key from the cache and return - // `None` so the caller can recompute the committee. - // - // It's worth noting that this is the only place where we removed unresolved - // promises from the cache. This means unresolved promises will only be removed if - // we try to access them again. This is OK, since the promises don't consume much - // memory. We expect that *all* promises should be resolved, unless there is a - // programming or database error. - Err(oneshot_broadcast::Error::SenderDropped) => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); - metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); - self.cache.remove(key); - None - } - }, - // The cache does not have this committee and it's not already promised to be computed. - None => { - metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); - None - } - } - } - - pub fn contains(&self, key: &AttestationShufflingId) -> bool { - self.cache.contains_key(key) - } - - pub fn insert_committee_cache( - &mut self, - key: AttestationShufflingId, - committee_cache: &C, - ) { - if self - .cache - .get(&key) - // Replace the committee if it's not present or if it's a promise. A bird in the hand is - // worth two in the promise-bush! - .map_or(true, CacheItem::is_promise) - { - self.insert_cache_item( - key, - CacheItem::Committee(committee_cache.to_arc_committee_cache()), - ); - } - } - - /// Prunes the cache first before inserting a new cache item. - fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { - self.prune_cache(); - self.cache.insert(key, cache_item); - } +pub const DEFAULT_MAX_CONCURRENT_PROMISES: usize = 2; - /// Prunes the `cache` to keep the size below the `cache_size` limit, based on the following - /// preferences: - /// - Entries from more recent epochs are preferred over older ones. - /// - Entries with shuffling ids matching the head's previous, current, and future epochs must - /// not be pruned. - fn prune_cache(&mut self) { - let target_cache_size = self.cache_size.saturating_sub(1); - if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) { - let shuffling_ids_to_prune = self - .cache - .keys() - .sorted_by_key(|key| key.shuffling_epoch) - .filter(|shuffling_id| { - Some(shuffling_id) - != self - .head_shuffling_ids - .id_for_epoch(shuffling_id.shuffling_epoch) - .as_ref() - .as_ref() - }) - .take(prune_count) - .cloned() - .collect::>(); - - for shuffling_id in shuffling_ids_to_prune.iter() { - debug!( - self.logger, - "Removing old shuffling from cache"; - "shuffling_epoch" => shuffling_id.shuffling_epoch, - "shuffling_decision_block" => ?shuffling_id.shuffling_decision_block - ); - self.cache.remove(shuffling_id); - } - } - } - - pub fn create_promise( - &mut self, - key: AttestationShufflingId, - ) -> Result>, BeaconChainError> { - let num_active_promises = self - .cache - .iter() - .filter(|(_, item)| item.is_promise()) - .count(); - if num_active_promises >= MAX_CONCURRENT_PROMISES { - return Err(BeaconChainError::MaxCommitteePromises(num_active_promises)); - } +impl Protect for BlockShufflingIds { + type SortKey = Epoch; - let (sender, receiver) = oneshot(); - self.insert_cache_item(key, CacheItem::Promise(receiver)); - Ok(sender) + fn sort_key(&self, k: &AttestationShufflingId) -> Epoch { + k.shuffling_epoch } - /// Inform the cache that the shuffling decision roots for the head has changed. - /// - /// The shufflings for the head's previous, current, and future epochs will never be ejected from - /// the cache during `Self::insert_cache_item`. - pub fn update_head_shuffling_ids(&mut self, head_shuffling_ids: BlockShufflingIds) { - self.head_shuffling_ids = head_shuffling_ids; + fn protect_from_eviction(&self, shuffling_id: &AttestationShufflingId) -> bool { + Some(shuffling_id) != self.id_for_epoch(shuffling_id.shuffling_epoch).as_ref() } } -/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. -pub trait ToArcCommitteeCache { - fn to_arc_committee_cache(&self) -> Arc; -} - -impl ToArcCommitteeCache for CommitteeCache { - fn to_arc_committee_cache(&self) -> Arc { - Arc::new(self.clone()) - } -} - -impl ToArcCommitteeCache for Arc { - fn to_arc_committee_cache(&self) -> Arc { - self.clone() - } -} +/// FIXME(sproul): restore logger? +pub type ShufflingCache = PromiseCache; /// Contains the shuffling IDs for a beacon block. #[derive(Clone)] @@ -316,7 +126,11 @@ mod test { block_root: Hash256::from_low_u64_le(0), }; let logger = null_logger().unwrap(); - ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids, logger) + ShufflingCache::new( + TEST_CACHE_SIZE, + head_shuffling_ids, + DEFAULT_MAX_CONCURRENT_PROMISES, + ) } /// Returns two different committee caches for testing. diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 80d9ce252de..5f01dc96cc8 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -407,7 +407,7 @@ fn advance_head( .shuffling_cache .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) .ok_or(BeaconChainError::AttestationCacheLockTimeout)? - .insert_committee_cache(shuffling_id.clone(), committee_cache); + .insert_value(shuffling_id.clone(), committee_cache); debug!( log, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a658346f57e..a07409ab8c9 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -958,7 +958,7 @@ pub fn serve( .shuffling_cache .try_write_for(std::time::Duration::from_secs(1)) { - cache_write.insert_committee_cache( + cache_write.insert_value( shuffling_id, &possibly_built_cache, ); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 9fe64d159f2..1a12a36e06b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -2349,7 +2349,7 @@ impl NetworkBeaconProcessor { debug!(self.log, "Attestation for finalized state"; "peer_id" => % peer_id); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } - e @ AttnError::BeaconChainError(BeaconChainError::MaxCommitteePromises(_)) => { + AttnError::BeaconChainError(BeaconChainError::ShufflingCacheError(e)) => { debug!( self.log, "Dropping attestation"; diff --git a/common/promise_cache/Cargo.toml b/common/promise_cache/Cargo.toml index 3ff5e324e0d..e8295faa830 100644 --- a/common/promise_cache/Cargo.toml +++ b/common/promise_cache/Cargo.toml @@ -6,3 +6,4 @@ edition.workspace = true [dependencies] derivative = { workspace = true } oneshot_broadcast = { path = "../oneshot_broadcast" } +itertools = { workspace = true } diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index a3646bc8a61..5f2889c5188 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -1,9 +1,32 @@ use derivative::Derivative; +use itertools::Itertools; use oneshot_broadcast::{oneshot, Receiver, Sender}; use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; +pub struct PromiseCache +where + K: Hash + Eq + Clone, + P: Protect, +{ + cache: HashMap>, + capacity: usize, + protector: P, + max_concurrent_promises: usize, +} + +/// A value implementing `Protect` is capable of preventing keys of type `K` from being evicted. +/// +/// It also dictates an ordering on key-value pairs which is used to prioritise evictions. +pub trait Protect { + type SortKey: Ord; + + fn protect_from_eviction(&self, k: &K) -> bool; + + fn sort_key(&self, k: &K) -> Self::SortKey; +} + #[derive(Derivative)] #[derivative(Clone(bound = ""))] pub enum CacheItem { @@ -17,6 +40,10 @@ pub enum PromiseCacheError { MaxConcurrentPromises(usize), } +pub trait ToArc { + fn to_arc(&self) -> Arc; +} + impl CacheItem { pub fn is_promise(&self) -> bool { matches!(self, CacheItem::Promise(_)) @@ -30,10 +57,6 @@ impl CacheItem { } } -pub trait ToArc { - fn to_arc(&self) -> Arc; -} - impl ToArc for Arc { fn to_arc(&self) -> Arc { self.clone() @@ -49,23 +72,16 @@ where } } -pub struct PromiseCache +impl PromiseCache where K: Hash + Eq + Clone, + P: Protect, { - cache: HashMap>, - capacity: usize, - max_concurrent_promises: usize, -} - -impl PromiseCache -where - K: Hash + Eq + Clone, -{ - pub fn new(capacity: usize, max_concurrent_promises: usize) -> Self { + pub fn new(capacity: usize, protector: P, max_concurrent_promises: usize) -> Self { Self { cache: HashMap::new(), capacity, + protector, max_concurrent_promises, } } @@ -144,10 +160,11 @@ where fn prune_cache(&mut self) { let target_cache_size = self.capacity.saturating_sub(1); if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) { - // FIXME(sproul): implement type-specific pruning let keys_to_prune = self .cache .keys() + .filter(|k| !self.protector.protect_from_eviction(*k)) + .sorted_by_key(|k| self.protector.sort_key(k)) .take(prune_count) .cloned() .collect::>(); @@ -157,4 +174,8 @@ where } } } + + pub fn update_protector(&mut self, protector: P) { + self.protector = protector; + } } From 129287fc01d3927f6060a0ddb73f28cd893c9cd3 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 11:27:33 +1100 Subject: [PATCH 04/12] CLI wiring --- beacon_node/beacon_chain/src/beacon_chain.rs | 4 ++-- beacon_node/beacon_chain/src/builder.rs | 8 +++++--- beacon_node/beacon_chain/src/chain_config.rs | 6 ++++++ beacon_node/beacon_chain/src/lib.rs | 2 +- .../{http_state_cache.rs => parallel_state_cache.rs} | 6 +++--- beacon_node/http_api/src/state_id.rs | 4 ++-- beacon_node/src/cli.rs | 11 +++++++++++ common/promise_cache/src/lib.rs | 5 ++++- 8 files changed, 34 insertions(+), 12 deletions(-) rename beacon_node/beacon_chain/src/{http_state_cache.rs => parallel_state_cache.rs} (73%) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 203d6972882..c5bec920732 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -32,7 +32,6 @@ use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, Prep use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; -use crate::http_state_cache::HttpStateCache; use crate::light_client_finality_update_verification::{ Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate, }; @@ -53,6 +52,7 @@ use crate::observed_attesters::{ use crate::observed_blob_sidecars::ObservedBlobSidecars; use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; +use crate::parallel_state_cache::ParallelStateCache; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; @@ -462,7 +462,7 @@ pub struct BeaconChain { /// A cache used to de-duplicate HTTP state requests. /// /// The cache is keyed by `state_root`. - pub http_state_cache: Arc>>, + pub parallel_state_cache: Arc>>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a397b125f2d..6120634ebe7 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -866,6 +866,7 @@ where let genesis_time = head_snapshot.beacon_state.genesis_time(); let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot)); let shuffling_cache_size = self.chain_config.shuffling_cache_size; + let parallel_state_cache_size = self.chain_config.parallel_state_cache_size; // Calculate the weak subjectivity point in which to backfill blocks to. let genesis_backfill_slot = if self.chain_config.genesis_backfill { @@ -942,14 +943,15 @@ where shuffling_cache: TimeoutRwLock::new(ShufflingCache::new( shuffling_cache_size, head_shuffling_ids, - 2, )), eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())), beacon_proposer_cache: <_>::default(), block_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), - // FIXME(sproul): make configurable - http_state_cache: Arc::new(RwLock::new(PromiseCache::new(3, Default::default(), 8))), + parallel_state_cache: Arc::new(RwLock::new(PromiseCache::new( + parallel_state_cache_size, + Default::default(), + ))), validator_pubkey_cache, attester_cache: <_>::default(), early_attester_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index bccc3732c3d..bc617ac362b 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -15,6 +15,9 @@ pub const DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR: u32 = 3; /// Fraction of a slot lookahead for fork choice in the state advance timer (500ms on mainnet). pub const FORK_CHOICE_LOOKAHEAD_FACTOR: u32 = 24; +/// Cache only a small number of states in the parallel cache by default. +pub const DEFAULT_PARALLEL_STATE_CACHE_SIZE: usize = 2; + #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] pub struct ChainConfig { /// Maximum number of slots to skip when importing an attestation. @@ -83,6 +86,8 @@ pub struct ChainConfig { pub progressive_balances_mode: ProgressiveBalancesMode, /// Number of epochs between each migration of data from the hot database to the freezer. pub epochs_per_migration: u64, + /// Size of the promise cache for de-duplicating parallel state requests. + pub parallel_state_cache_size: usize, } impl Default for ChainConfig { @@ -114,6 +119,7 @@ impl Default for ChainConfig { always_prepare_payload: false, progressive_balances_mode: ProgressiveBalancesMode::Checked, epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION, + parallel_state_cache_size: DEFAULT_PARALLEL_STATE_CACHE_SIZE, } } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index a0974934d9b..4afbbcd95a6 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -27,7 +27,6 @@ pub mod fork_choice_signal; pub mod fork_revert; mod head_tracker; pub mod historical_blocks; -mod http_state_cache; pub mod kzg_utils; pub mod light_client_finality_update_verification; pub mod light_client_optimistic_update_verification; @@ -41,6 +40,7 @@ mod observed_blob_sidecars; pub mod observed_block_producers; pub mod observed_operations; pub mod otb_verification_service; +mod parallel_state_cache; mod persisted_beacon_chain; mod persisted_fork_choice; mod pre_finalization_cache; diff --git a/beacon_node/beacon_chain/src/http_state_cache.rs b/beacon_node/beacon_chain/src/parallel_state_cache.rs similarity index 73% rename from beacon_node/beacon_chain/src/http_state_cache.rs rename to beacon_node/beacon_chain/src/parallel_state_cache.rs index 5413119144c..39fb337fdf2 100644 --- a/beacon_node/beacon_chain/src/http_state_cache.rs +++ b/beacon_node/beacon_chain/src/parallel_state_cache.rs @@ -2,9 +2,9 @@ use promise_cache::{PromiseCache, Protect}; use types::{BeaconState, EthSpec, Hash256}; #[derive(Default)] -pub struct HttpProtector; +pub struct ParallelStateProtector; -impl Protect for HttpProtector { +impl Protect for ParallelStateProtector { type SortKey = Hash256; /// We don't care too much about preventing evictions of particular states here. All the states @@ -19,4 +19,4 @@ impl Protect for HttpProtector { } } -pub type HttpStateCache = PromiseCache, HttpProtector>; +pub type ParallelStateCache = PromiseCache, ParallelStateProtector>; diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 57f75c91726..632efe106a2 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -189,7 +189,7 @@ impl StateId { _ => (self.root(chain)?, None), }; - let mut opt_state_cache = Some(chain.http_state_cache.write()); + let mut opt_state_cache = Some(chain.parallel_state_cache.write()); // Try the cache. if let Some(cache_item) = opt_state_cache @@ -225,7 +225,7 @@ impl StateId { "HTTP state cache miss"; "state_root" => ?state_root ); - let mut state_cache = opt_state_cache.unwrap_or_else(|| chain.http_state_cache.write()); + let mut state_cache = opt_state_cache.unwrap_or_else(|| chain.parallel_state_cache.write()); let sender = state_cache.create_promise(state_root).map_err(|e| { warp_utils::reject::custom_server_error(format!("too many concurrent requests: {e:?}")) diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 67d97b0d5a0..ea7ebcae77e 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -853,6 +853,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("0") ) + .arg( + Arg::with_name("parallel-state-cache-size") + .long("parallel-state-cache-size") + .value_name("N") + .help("Set the size of the cache used to de-duplicate requests for the same \ + state. This cache is additional to other state caches within Lighthouse \ + and should be kept small unless a large number of parallel requests for \ + different states are anticipated.") + .takes_value(true) + .default_value("2") + ) /* * Misc. diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index 5f2889c5188..ac83472028a 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -77,7 +77,10 @@ where K: Hash + Eq + Clone, P: Protect, { - pub fn new(capacity: usize, protector: P, max_concurrent_promises: usize) -> Self { + pub fn new(capacity: usize, protector: P) -> Self { + // Making the concurrent promises directly configurable is considered overkill for now, + // so we just derive a vaguely sensible value from the cache size. + let max_concurrent_promises = std::cmp::max(2, capacity / 8); Self { cache: HashMap::new(), capacity, From f876e583b415826dbdb4ba23dc641d8b16fe5014 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 11:42:13 +1100 Subject: [PATCH 05/12] Restore logging --- Cargo.lock | 1 + beacon_node/beacon_chain/src/builder.rs | 3 ++- beacon_node/beacon_chain/src/errors.rs | 1 - beacon_node/beacon_chain/src/parallel_state_cache.rs | 10 +++++----- beacon_node/beacon_chain/src/shuffling_cache.rs | 10 +++++++++- beacon_node/http_api/src/state_id.rs | 6 +++--- common/promise_cache/Cargo.toml | 1 + common/promise_cache/src/lib.rs | 12 +++++++++--- 8 files changed, 30 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c5e07d8751..1f0d844b5d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6032,6 +6032,7 @@ dependencies = [ "derivative", "itertools", "oneshot_broadcast", + "slog", ] [[package]] diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 6120634ebe7..1fa3254b91c 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -939,10 +939,10 @@ where fork_choice_signal_rx, event_handler: self.event_handler, head_tracker, - // FIXME(sproul): make configurable shuffling_cache: TimeoutRwLock::new(ShufflingCache::new( shuffling_cache_size, head_shuffling_ids, + log.clone(), )), eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())), beacon_proposer_cache: <_>::default(), @@ -951,6 +951,7 @@ where parallel_state_cache: Arc::new(RwLock::new(PromiseCache::new( parallel_state_cache_size, Default::default(), + log.clone(), ))), validator_pubkey_cache, attester_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index ee79ec43bae..225e4c2c43a 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -212,7 +212,6 @@ pub enum BeaconChainError { AttestationHeadNotInForkChoice(Hash256), MissingPersistedForkChoice, ShufflingCacheError(promise_cache::PromiseCacheError), - HttpCacheError(promise_cache::PromiseCacheError), BlsToExecutionPriorToCapella, BlsToExecutionConflictsWithPool, InconsistentFork(InconsistentFork), diff --git a/beacon_node/beacon_chain/src/parallel_state_cache.rs b/beacon_node/beacon_chain/src/parallel_state_cache.rs index 39fb337fdf2..4b4e6f5cf57 100644 --- a/beacon_node/beacon_chain/src/parallel_state_cache.rs +++ b/beacon_node/beacon_chain/src/parallel_state_cache.rs @@ -7,16 +7,16 @@ pub struct ParallelStateProtector; impl Protect for ParallelStateProtector { type SortKey = Hash256; + /// Evict in arbitrary (hash) order. + fn sort_key(&self, k: &Hash256) -> Self::SortKey { + *k + } + /// We don't care too much about preventing evictions of particular states here. All the states /// in this cache should be different from the head state. fn protect_from_eviction(&self, _: &Hash256) -> bool { false } - - /// Evict in arbitrary (hash) order. - fn sort_key(&self, k: &Hash256) -> Self::SortKey { - *k - } } pub type ParallelStateCache = PromiseCache, ParallelStateProtector>; diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index d0f3abfdb8d..6886df2e577 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -38,9 +38,17 @@ impl Protect for BlockShufflingIds { fn protect_from_eviction(&self, shuffling_id: &AttestationShufflingId) -> bool { Some(shuffling_id) != self.id_for_epoch(shuffling_id.shuffling_epoch).as_ref() } + + fn notify_eviction(&self, shuffling_id: &AttestationShufflingId, logger: &Logger) { + debug!( + logger, + "Removing old shuffling from cache"; + "shuffling_epoch" => shuffling_id.shuffling_epoch, + "shuffling_decision_block" => ?shuffling_id.shuffling_decision_block + ); + } } -/// FIXME(sproul): restore logger? pub type ShufflingCache = PromiseCache; /// Contains the shuffling IDs for a beacon block. diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 632efe106a2..cd8db72bcf4 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -1,7 +1,7 @@ use crate::ExecutionOptimistic; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::StateId as CoreStateId; -use slog::{info, warn}; +use slog::{debug, info, warn}; use std::fmt; use std::str::FromStr; use std::sync::Arc; @@ -199,7 +199,7 @@ impl StateId { drop(opt_state_cache.take()); match cache_item.wait() { Ok(state) => { - info!( + debug!( chain.logger(), "HTTP state cache hit"; "state_root" => ?state_root, @@ -220,7 +220,7 @@ impl StateId { } // Re-lock only in case of failed promise. - warn!( + debug!( chain.logger(), "HTTP state cache miss"; "state_root" => ?state_root diff --git a/common/promise_cache/Cargo.toml b/common/promise_cache/Cargo.toml index e8295faa830..b5fa42bd438 100644 --- a/common/promise_cache/Cargo.toml +++ b/common/promise_cache/Cargo.toml @@ -7,3 +7,4 @@ edition.workspace = true derivative = { workspace = true } oneshot_broadcast = { path = "../oneshot_broadcast" } itertools = { workspace = true } +slog = { workspace = true } diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index ac83472028a..1df0e67a359 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -1,6 +1,7 @@ use derivative::Derivative; use itertools::Itertools; use oneshot_broadcast::{oneshot, Receiver, Sender}; +use slog::Logger; use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; @@ -14,17 +15,20 @@ where capacity: usize, protector: P, max_concurrent_promises: usize, + logger: Logger, } /// A value implementing `Protect` is capable of preventing keys of type `K` from being evicted. /// -/// It also dictates an ordering on key-value pairs which is used to prioritise evictions. +/// It also dictates an ordering on keys which is used to prioritise evictions. pub trait Protect { type SortKey: Ord; + fn sort_key(&self, k: &K) -> Self::SortKey; + fn protect_from_eviction(&self, k: &K) -> bool; - fn sort_key(&self, k: &K) -> Self::SortKey; + fn notify_eviction(&self, _k: &K, _log: &Logger) {} } #[derive(Derivative)] @@ -77,7 +81,7 @@ where K: Hash + Eq + Clone, P: Protect, { - pub fn new(capacity: usize, protector: P) -> Self { + pub fn new(capacity: usize, protector: P, logger: Logger) -> Self { // Making the concurrent promises directly configurable is considered overkill for now, // so we just derive a vaguely sensible value from the cache size. let max_concurrent_promises = std::cmp::max(2, capacity / 8); @@ -86,6 +90,7 @@ where capacity, protector, max_concurrent_promises, + logger, } } @@ -173,6 +178,7 @@ where .collect::>(); for key in &keys_to_prune { + self.protector.notify_eviction(key, &self.logger); self.cache.remove(key); } } From 2bbc69a3a17895a7603fd993af458170e78da7a3 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 11:49:33 +1100 Subject: [PATCH 06/12] Fix and test CLI wiring --- beacon_node/http_api/src/state_id.rs | 2 +- beacon_node/src/config.rs | 5 +++++ lighthouse/tests/beacon_node.rs | 19 +++++++++++++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index cd8db72bcf4..c8aaba9dc87 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -1,7 +1,7 @@ use crate::ExecutionOptimistic; use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::types::StateId as CoreStateId; -use slog::{debug, info, warn}; +use slog::{debug, warn}; use std::fmt; use std::str::FromStr; use std::sync::Arc; diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index d1f64eb751d..8d2bac22b77 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -427,6 +427,11 @@ pub fn get_config( if let Some(state_cache_size) = clap_utils::parse_optional(cli_args, "state-cache-size")? { client_config.store.state_cache_size = state_cache_size; } + if let Some(parallel_state_cache_size) = + clap_utils::parse_optional(cli_args, "parallel-state-cache-size")? + { + client_config.chain.parallel_state_cache_size = parallel_state_cache_size; + } if let Some(diff_buffer_cache_size) = clap_utils::parse_optional(cli_args, "diff-buffer-cache-size")? { diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 3e03b18e61f..2c13429f01a 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1899,6 +1899,25 @@ fn historic_state_cache_size_default() { }); } #[test] +fn parallel_state_cache_size_flag() { + CommandLineTest::new() + .flag("parallel-state-cache-size", Some("4")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.chain.parallel_state_cache_size, 4_usize)); +} +#[test] +fn parallel_state_cache_size_default() { + use beacon_node::beacon_chain::chain_config::DEFAULT_PARALLEL_STATE_CACHE_SIZE; + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.parallel_state_cache_size, + DEFAULT_PARALLEL_STATE_CACHE_SIZE + ); + }); +} +#[test] fn auto_compact_db_flag() { CommandLineTest::new() .flag("auto-compact-db", Some("false")) From 35ff1981d8a18f9e9b18323b7fb8d84766289ba6 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 12:29:23 +1100 Subject: [PATCH 07/12] Fix protect bug & tests --- .../beacon_chain/src/shuffling_cache.rs | 90 ++++++++----------- common/promise_cache/src/lib.rs | 22 +++++ 2 files changed, 60 insertions(+), 52 deletions(-) diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 6886df2e577..95893c0e4c6 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,4 +1,4 @@ -use crate::{metrics, BeaconChainError}; +use crate::metrics; use itertools::Itertools; use oneshot_broadcast::{oneshot, Receiver, Sender}; use promise_cache::{PromiseCache, Protect}; @@ -15,18 +15,18 @@ use types::{ /// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + /// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this /// ignores a few extra bytes in the caches that should be insignificant compared to the indices). -pub const DEFAULT_CACHE_SIZE: usize = 16; - -/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this -/// limits the number of concurrent states that can be loaded into memory for the committee cache. -/// This prevents excessive memory usage at the cost of rejecting some attestations. +/// +/// The cache size also determines the maximum number of concurrent committee cache "promises" that +/// can be issued. In effect, this limits the number of concurrent states that can be loaded into +/// memory for the committee cache. This prevents excessive memory usage at the cost of rejecting +/// some attestations. /// /// We set this value to 2 since states can be quite large and have a significant impact on memory /// usage. A healthy network cannot have more than a few committee caches and those caches should /// always be inserted during block import. Unstable networks with a high degree of forking might /// see some attestations dropped due to this concurrency limit, however I propose that this is /// better than low-resource nodes going OOM. -pub const DEFAULT_MAX_CONCURRENT_PROMISES: usize = 2; +pub const DEFAULT_CACHE_SIZE: usize = 16; impl Protect for BlockShufflingIds { type SortKey = Epoch; @@ -36,7 +36,7 @@ impl Protect for BlockShufflingIds { } fn protect_from_eviction(&self, shuffling_id: &AttestationShufflingId) -> bool { - Some(shuffling_id) != self.id_for_epoch(shuffling_id.shuffling_epoch).as_ref() + Some(shuffling_id) == self.id_for_epoch(shuffling_id.shuffling_epoch).as_ref() } fn notify_eviction(&self, shuffling_id: &AttestationShufflingId, logger: &Logger) { @@ -52,7 +52,7 @@ impl Protect for BlockShufflingIds { pub type ShufflingCache = PromiseCache; /// Contains the shuffling IDs for a beacon block. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct BlockShufflingIds { pub current: AttestationShufflingId, pub next: AttestationShufflingId, @@ -112,13 +112,12 @@ impl BlockShufflingIds { #[cfg(not(debug_assertions))] #[cfg(test)] mod test { + use super::*; + use crate::test_utils::EphemeralHarnessType; + use promise_cache::{CacheItem, PromiseCacheError}; use task_executor::test_utils::null_logger; use types::*; - use crate::test_utils::EphemeralHarnessType; - - use super::*; - type E = MinimalEthSpec; type TestBeaconChainType = EphemeralHarnessType; type BeaconChainHarness = crate::test_utils::BeaconChainHarness; @@ -134,11 +133,7 @@ mod test { block_root: Hash256::from_low_u64_le(0), }; let logger = null_logger().unwrap(); - ShufflingCache::new( - TEST_CACHE_SIZE, - head_shuffling_ids, - DEFAULT_MAX_CONCURRENT_PROMISES, - ) + ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids, logger) } /// Returns two different committee caches for testing. @@ -194,10 +189,10 @@ mod test { // Ensure the promise has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Complete(committee) if committee == committee_a), "the promise should be resolved" ); - assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); + assert_eq!(cache.len(), 1, "the cache should have one entry"); } #[test] @@ -221,7 +216,7 @@ mod test { // Ensure the key now indicates an empty slot. assert!(cache.get(&id_a).is_none(), "the slot should be empty"); - assert!(cache.cache.is_empty(), "the cache should be empty"); + assert!(cache.is_empty(), "the cache should be empty"); } #[test] @@ -255,7 +250,7 @@ mod test { // Ensure promise A has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Complete(committee) if committee == committee_a), "promise A should be resolved" ); @@ -264,41 +259,40 @@ mod test { // Ensure promise B has been resolved. let item = cache.get(&id_b).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_b), + matches!(item, CacheItem::Complete(committee) if committee == committee_b), "promise B should be resolved" ); // Check both entries again. assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Complete(committee) if committee == committee_a), "promise A should remain resolved" ); assert!( - matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + matches!(cache.get(&id_b).unwrap(), CacheItem::Complete(committee) if committee == committee_b), "promise B should remain resolved" ); - assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); + assert_eq!(cache.len(), 2, "the cache should have two entries"); } #[test] fn too_many_promises() { let mut cache = new_shuffling_cache(); - for i in 0..MAX_CONCURRENT_PROMISES { + for i in 0..cache.max_concurrent_promises() { cache.create_promise(shuffling_id(i as u64)).unwrap(); } // Ensure that the next promise returns an error. It is important for the application to // dump his ass when he can't keep his promises, you're a queen and you deserve better. assert!(matches!( - cache.create_promise(shuffling_id(MAX_CONCURRENT_PROMISES as u64)), - Err(BeaconChainError::MaxCommitteePromises( - MAX_CONCURRENT_PROMISES - )) + cache.create_promise(shuffling_id(cache.max_concurrent_promises() as u64)), + Err(PromiseCacheError::MaxConcurrentPromises(n)) + if n == cache.max_concurrent_promises() )); assert_eq!( - cache.cache.len(), - MAX_CONCURRENT_PROMISES, + cache.len(), + cache.max_concurrent_promises(), "the cache should have two entries" ); } @@ -308,9 +302,9 @@ mod test { let mut cache = new_shuffling_cache(); let id_a = shuffling_id(1); let committee_cache_a = Arc::new(CommitteeCache::default()); - cache.insert_committee_cache(id_a.clone(), &committee_cache_a); + cache.insert_value(id_a.clone(), &committee_cache_a); assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Complete(committee_cache) if committee_cache == committee_cache_a), "should insert committee cache" ); } @@ -323,7 +317,7 @@ mod test { .collect::>(); for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() { - cache.insert_committee_cache(shuffling_id.clone(), committee_cache); + cache.insert_value(shuffling_id.clone(), committee_cache); } for i in 1..(TEST_CACHE_SIZE + 1) { @@ -337,11 +331,7 @@ mod test { !cache.contains(&shuffling_id_and_committee_caches.get(0).unwrap().0), "should not contain oldest epoch shuffling id" ); - assert_eq!( - cache.cache.len(), - cache.cache_size, - "should limit cache size" - ); + assert_eq!(cache.len(), TEST_CACHE_SIZE, "should limit cache size"); } #[test] @@ -356,7 +346,7 @@ mod test { shuffling_epoch: (current_epoch + 1).into(), shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64), }; - cache.insert_committee_cache(shuffling_id, &committee_cache); + cache.insert_value(shuffling_id, &committee_cache); } // Now, update the head shuffling ids @@ -366,12 +356,12 @@ mod test { previous: Some(shuffling_id(current_epoch - 1)), block_root: Hash256::from_low_u64_le(42), }; - cache.update_head_shuffling_ids(head_shuffling_ids.clone()); + cache.update_protector(head_shuffling_ids.clone()); // Insert head state shuffling ids. Should not be overridden by other shuffling ids. - cache.insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache); - cache.insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache); - cache.insert_committee_cache( + cache.insert_value(head_shuffling_ids.current.clone(), &committee_cache); + cache.insert_value(head_shuffling_ids.next.clone(), &committee_cache); + cache.insert_value( head_shuffling_ids.previous.clone().unwrap(), &committee_cache, ); @@ -382,7 +372,7 @@ mod test { shuffling_epoch: Epoch::from(i), shuffling_decision_block: Hash256::from_low_u64_be(i as u64), }; - cache.insert_committee_cache(shuffling_id, &committee_cache); + cache.insert_value(shuffling_id, &committee_cache); } assert!( @@ -397,10 +387,6 @@ mod test { cache.contains(&head_shuffling_ids.previous.unwrap()), "should retain head shuffling id for previous epoch." ); - assert_eq!( - cache.cache.len(), - cache.cache_size, - "should limit cache size" - ); + assert_eq!(cache.len(), TEST_CACHE_SIZE, "should limit cache size"); } } diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index 1df0e67a359..9783be7cab9 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; +#[derive(Debug)] pub struct PromiseCache where K: Hash + Eq + Clone, @@ -38,6 +39,15 @@ pub enum CacheItem { Promise(Receiver>), } +impl std::fmt::Debug for CacheItem { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + CacheItem::Complete(value) => value.fmt(f), + CacheItem::Promise(_) => "Promise(..)".fmt(f), + } + } +} + #[derive(Debug)] pub enum PromiseCacheError { Failed(oneshot_broadcast::Error), @@ -187,4 +197,16 @@ where pub fn update_protector(&mut self, protector: P) { self.protector = protector; } + + pub fn len(&self) -> usize { + self.cache.len() + } + + pub fn is_empty(&self) -> bool { + self.cache.is_empty() + } + + pub fn max_concurrent_promises(&self) -> usize { + self.max_concurrent_promises + } } From e3e02b152d0d8699727047f0a52765483ea58be2 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 12:39:24 +1100 Subject: [PATCH 08/12] Remove unused import --- beacon_node/beacon_chain/src/shuffling_cache.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 95893c0e4c6..1c1aaf728e1 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,4 +1,3 @@ -use crate::metrics; use itertools::Itertools; use oneshot_broadcast::{oneshot, Receiver, Sender}; use promise_cache::{PromiseCache, Protect}; From 941ee3f55d79815d7cf893e5dfa280cc971f4bd4 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 13:23:45 +1100 Subject: [PATCH 09/12] Simplify and fix lints --- Cargo.lock | 1 - beacon_node/beacon_chain/Cargo.toml | 1 - beacon_node/beacon_chain/src/parallel_state_cache.rs | 2 +- beacon_node/beacon_chain/src/shuffling_cache.rs | 4 ---- 4 files changed, 1 insertion(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6500fae04e8..327ee846709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -569,7 +569,6 @@ dependencies = [ "lru", "maplit", "merkle_proof", - "oneshot_broadcast", "operation_pool", "parking_lot 0.12.1", "promise_cache", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 3dbe9eba47b..aa169f663dc 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -43,7 +43,6 @@ lighthouse_metrics = { workspace = true } logging = { workspace = true } lru = { workspace = true } merkle_proof = { workspace = true } -oneshot_broadcast = { path = "../../common/oneshot_broadcast/" } operation_pool = { workspace = true } parking_lot = { workspace = true } promise_cache = { path = "../../common/promise_cache" } diff --git a/beacon_node/beacon_chain/src/parallel_state_cache.rs b/beacon_node/beacon_chain/src/parallel_state_cache.rs index 4b4e6f5cf57..a3c3f76fd7f 100644 --- a/beacon_node/beacon_chain/src/parallel_state_cache.rs +++ b/beacon_node/beacon_chain/src/parallel_state_cache.rs @@ -1,5 +1,5 @@ use promise_cache::{PromiseCache, Protect}; -use types::{BeaconState, EthSpec, Hash256}; +use types::{BeaconState, Hash256}; #[derive(Default)] pub struct ParallelStateProtector; diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 1c1aaf728e1..47d45074483 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,9 +1,5 @@ -use itertools::Itertools; -use oneshot_broadcast::{oneshot, Receiver, Sender}; use promise_cache::{PromiseCache, Protect}; use slog::{debug, Logger}; -use std::collections::HashMap; -use std::sync::Arc; use types::{ beacon_state::CommitteeCache, AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch, From cc7786bc1da4e272b4eba4db9add5072fa34b52b Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 14:17:30 +1100 Subject: [PATCH 10/12] Update book text --- book/src/help_bn.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/book/src/help_bn.md b/book/src/help_bn.md index bb4d7a10c64..6dd4a8ee600 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -117,6 +117,8 @@ FLAGS: --eth1` pre-merge --subscribe-all-subnets Subscribe to all subnets regardless of validator count. This will also advertise the beacon node as being long-lived subscribed to all subnets. + --unsafe-and-dangerous-mode Don't use this flag unless you know what you're doing. Go back and + download a stable Lighthouse release --validator-monitor-auto Enables the automatic detection and monitoring of validators connected to the HTTP API and using the subnet subscription endpoint. This generally has the effect of providing additional logging and metrics for locally @@ -195,6 +197,9 @@ OPTIONS: --checkpoint-sync-url-timeout Set the timeout for checkpoint sync calls to remote beacon node HTTP endpoint. [default: 180] + --compression-level + Compression level (-99 to 22) for zstd compression applied to states on disk [default: 1]. You may change + the compression level freely without re-syncing. -d, --datadir Used to specify a custom root data directory for lighthouse keys and databases. Defaults to $HOME/.lighthouse/{network} where network is the value of the `network` flag Note: Users should specify @@ -202,6 +207,9 @@ OPTIONS: --debug-level Specifies the verbosity level used when emitting logs to the terminal. [default: info] [possible values: info, debug, trace, warn, error, crit] + --diff-buffer-cache-size + The maximum number of diff buffers to hold in memory. This cache is used when fetching historic states + [default: 16] --discovery-port The UDP port that discovery will listen on. Defaults to `port` @@ -238,6 +246,10 @@ OPTIONS: --epochs-per-migration The number of epochs to wait between running the migration of data from the hot DB to the cold DB. Less frequent runs can be useful for minimizing disk writes [default: 1] + --epochs-per-state-diff + Number of epochs between state diffs stored in the database. Lower values result in more writes and more + data stored, while higher values result in more block replaying and longer load times in case of cache miss. + [default: 16] --eth1-blocks-per-log-query Specifies the number of blocks that a deposit log query should span. This will reduce the size of responses from the Eth1 endpoint. [default: 1000] @@ -280,6 +292,13 @@ OPTIONS: --graffiti Specify your custom graffiti to be included in blocks. Defaults to the current version and commit, truncated to fit in 32 bytes. + --hierarchy-exponents + Specifies the frequency for storing full state snapshots and hierarchical diffs in the freezer DB. Accepts a + comma-separated list of ascending exponents. Each exponent defines an interval for storing diffs to the + layer above. The last exponent defines the interval for full snapshots. For example, a config of '4,8,12' + would store a full snapshot every 4096 (2^12) slots, first-level diffs every 256 (2^8) slots, and second- + level diffs every 16 (2^4) slots. Cannot be changed after initialization. [default: + 5,9,11,13,16,18,21] --historic-state-cache-size Specifies how many states from the freezer database should cache in memory [default: 1] @@ -376,6 +395,10 @@ OPTIONS: --network-dir Data directory for network keys. Defaults to network/ inside the beacon node dir. + --parallel-state-cache-size + Set the size of the cache used to de-duplicate requests for the same state. This cache is additional to + other state caches within Lighthouse and should be kept small unless a large number of parallel requests for + different states are anticipated. [default: 2] --port The TCP/UDP ports to listen on. There are two UDP ports. The discovery UDP port will be set to this value and the Quic UDP port will be set to this value + 1. The discovery port can be modified by the --discovery- @@ -462,6 +485,9 @@ OPTIONS: --slots-per-restore-point Specifies how often a freezer DB restore point should be stored. Cannot be changed after initialization. [default: 8192 (mainnet) or 64 (minimal)] + --state-cache-size + Specifies how many states the database should cache in memory [default: 128] + --suggested-fee-recipient Emergency fallback fee recipient for use in case the validator client does not have one configured. You should set this flag on the validator client instead of (or in addition to) setting it here. From a3bf946ea9326208f56d978422133e5b2470c41e Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 15:46:04 +1100 Subject: [PATCH 11/12] Resolve HTTP promises correctly --- .../beacon_chain/src/parallel_state_cache.rs | 2 +- beacon_node/beacon_chain/src/shuffling_cache.rs | 1 + beacon_node/http_api/src/state_id.rs | 7 ++++--- common/promise_cache/src/lib.rs | 15 +++++++++++++++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/beacon_node/beacon_chain/src/parallel_state_cache.rs b/beacon_node/beacon_chain/src/parallel_state_cache.rs index a3c3f76fd7f..f56e3448034 100644 --- a/beacon_node/beacon_chain/src/parallel_state_cache.rs +++ b/beacon_node/beacon_chain/src/parallel_state_cache.rs @@ -1,7 +1,7 @@ use promise_cache::{PromiseCache, Protect}; use types::{BeaconState, Hash256}; -#[derive(Default)] +#[derive(Debug, Default)] pub struct ParallelStateProtector; impl Protect for ParallelStateProtector { diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 47d45074483..64b24ccd043 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -110,6 +110,7 @@ mod test { use super::*; use crate::test_utils::EphemeralHarnessType; use promise_cache::{CacheItem, PromiseCacheError}; + use std::sync::Arc; use task_executor::test_utils::null_logger; use types::*; diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index c8aaba9dc87..c4b721f0411 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -4,7 +4,6 @@ use eth2::types::StateId as CoreStateId; use slog::{debug, warn}; use std::fmt; use std::str::FromStr; -use std::sync::Arc; use types::{BeaconState, Checkpoint, EthSpec, Fork, Hash256, Slot}; /// Wraps `eth2::types::StateId` and provides common state-access functionality. E.g., reading @@ -244,8 +243,10 @@ impl StateId { }) })?; - // Fulfil promise. - sender.send(Arc::new(state.clone())); + // Fulfil promise (and re-lock again). + let mut state_cache = chain.parallel_state_cache.write(); + state_cache.resolve_promise(sender, state_root, &state); + drop(state_cache); Ok((state, execution_optimistic, finalized)) } diff --git a/common/promise_cache/src/lib.rs b/common/promise_cache/src/lib.rs index 9783be7cab9..36b6bd984f5 100644 --- a/common/promise_cache/src/lib.rs +++ b/common/promise_cache/src/lib.rs @@ -156,6 +156,21 @@ where } } + /// Take care of resolving a promise by ensuring the value is made available: + /// + /// 1. To all waiting thread that are holding a `Receiver`. + /// 2. In the cache itself for future callers. + pub fn resolve_promise>(&mut self, sender: Sender>, key: K, value: &C) { + // Use the sender to notify all actively waiting receivers. + let arc_value = value.to_arc(); + sender.send(arc_value.clone()); + + // Re-insert the value into the cache. The promise may have been evicted in the meantime, + // but we probably want to keep this value (which resolved recently) over other older cache + // entries. + self.insert_value(key, &arc_value); + } + /// Prunes the cache first before inserting a new item. fn insert_cache_item(&mut self, key: K, cache_item: CacheItem) { self.prune_cache(); From bd1758a03ee592c0534b8b8d97952c76609c6abf Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 11 Jan 2024 15:54:14 +1100 Subject: [PATCH 12/12] Use a better sort order for state cache --- beacon_node/beacon_chain/src/parallel_state_cache.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/beacon_node/beacon_chain/src/parallel_state_cache.rs b/beacon_node/beacon_chain/src/parallel_state_cache.rs index f56e3448034..d568d3248cd 100644 --- a/beacon_node/beacon_chain/src/parallel_state_cache.rs +++ b/beacon_node/beacon_chain/src/parallel_state_cache.rs @@ -5,11 +5,11 @@ use types::{BeaconState, Hash256}; pub struct ParallelStateProtector; impl Protect for ParallelStateProtector { - type SortKey = Hash256; + type SortKey = usize; - /// Evict in arbitrary (hash) order. - fn sort_key(&self, k: &Hash256) -> Self::SortKey { - *k + /// Evict in arbitrary (hashmap) order by using the same key for every value. + fn sort_key(&self, _: &Hash256) -> Self::SortKey { + 0 } /// We don't care too much about preventing evictions of particular states here. All the states