Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ maplit = "1"
milhouse = "0.5"
mockito = "1.5.0"
num_cpus = "1"
once_cell = "1.17.1"
parking_lot = "0.12"
paste = "1"
prometheus = { version = "0.13", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ logging = { workspace = true }
lru = { workspace = true }
merkle_proof = { workspace = true }
metrics = { workspace = true }
once_cell = { workspace = true }
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
operation_pool = { workspace = true }
parking_lot = { workspace = true }
Expand Down
49 changes: 35 additions & 14 deletions beacon_node/beacon_chain/src/beacon_proposer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use fork_choice::ExecutionStatus;
use lru::LruCache;
use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::cmp::Ordering;
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
Expand All @@ -39,21 +41,21 @@ pub struct Proposer {
/// their signatures.
pub struct EpochBlockProposers {
/// The epoch to which the proposers pertain.
epoch: Epoch,
pub(crate) epoch: Epoch,
/// The fork that should be used to verify proposer signatures.
fork: Fork,
pub(crate) fork: Fork,
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
/// in that epoch.
///
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
pub(crate) proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
}

/// A cache to store the proposers for some epoch.
///
/// See the module-level documentation for more information.
pub struct BeaconProposerCache {
cache: LruCache<(Epoch, Hash256), EpochBlockProposers>,
cache: LruCache<(Epoch, Hash256), Arc<OnceCell<EpochBlockProposers>>>,
}

impl Default for BeaconProposerCache {
Expand All @@ -74,7 +76,8 @@ impl BeaconProposerCache {
) -> Option<Proposer> {
let epoch = slot.epoch(E::slots_per_epoch());
let key = (epoch, shuffling_decision_block);
if let Some(cache) = self.cache.get(&key) {
let cache_opt = self.cache.get(&key).and_then(|cell| cell.get());
if let Some(cache) = cache_opt {
// This `if` statement is likely unnecessary, but it feels like good practice.
if epoch == cache.epoch {
cache
Expand Down Expand Up @@ -103,7 +106,26 @@ impl BeaconProposerCache {
epoch: Epoch,
) -> Option<&SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> {
let key = (epoch, shuffling_decision_block);
self.cache.get(&key).map(|cache| &cache.proposers)
self.cache
.get(&key)
.and_then(|cache_once_cell| cache_once_cell.get().map(|proposers| &proposers.proposers))
}

/// Returns the `OnceCell` for the given `(epoch, shuffling_decision_block)` key,
/// inserting an empty one if it doesn't exist.
///
/// The returned `OnceCell` allows the caller to initialise the value externally
/// using `get_or_try_init`, enabling deferred computation without holding a mutable
/// reference to the cache.
pub fn get_or_insert_key(
&mut self,
epoch: Epoch,
shuffling_decision_block: Hash256,
) -> Arc<OnceCell<EpochBlockProposers>> {
let key = (epoch, shuffling_decision_block);
self.cache
.get_or_insert(key, || Arc::new(OnceCell::new()))
.clone()
}

/// Insert the proposers into the cache.
Expand All @@ -120,14 +142,13 @@ impl BeaconProposerCache {
) -> Result<(), BeaconStateError> {
let key = (epoch, shuffling_decision_block);
if !self.cache.contains(&key) {
self.cache.put(
key,
EpochBlockProposers {
epoch,
fork,
proposers: proposers.into(),
},
);
let epoch_proposers = EpochBlockProposers {
epoch,
fork,
proposers: proposers.into(),
};
self.cache
.put(key, Arc::new(OnceCell::with_value(epoch_proposers)));
}

Ok(())
Expand Down
41 changes: 24 additions & 17 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::beacon_proposer_cache::EpochBlockProposers;
use crate::block_verification::{
cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info,
BlockSlashInfo,
Expand Down Expand Up @@ -602,14 +603,19 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
parent_block.root
};

let proposer_opt = chain
// We lock the cache briefly to get or insert a OnceCell, then drop the lock
// before doing proposer shuffling calculation via `OnceCell::get_or_try_init`. This avoids
// holding the lock during the computation, while still ensuring the result is cached and
// initialised only once.
//
// This approach exposes the cache internals (`OnceCell` & `EpochBlockProposers`)
// as a trade-off for avoiding lock contention.
let epoch_proposers_cell = chain
.beacon_proposer_cache
.lock()
.get_slot::<T::EthSpec>(proposer_shuffling_root, column_slot);
.get_or_insert_key(column_epoch, proposer_shuffling_root);

let (proposer_index, fork) = if let Some(proposer) = proposer_opt {
(proposer.index, proposer.fork)
} else {
let epoch_proposers = epoch_proposers_cell.get_or_try_init(move || {
debug!(
%block_root,
index = %column_index,
Expand All @@ -633,19 +639,20 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
)?;

let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
let proposer_index = *proposers
.get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize)
.ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;

// Prime the proposer shuffling cache with the newly-learned value.
chain.beacon_proposer_cache.lock().insert(
column_epoch,
proposer_shuffling_root,
proposers,
state.fork(),
)?;
(proposer_index, state.fork())
};
Ok::<_, GossipDataColumnError>(EpochBlockProposers {
epoch: column_epoch,
fork: state.fork(),
proposers: proposers.into(),
})
})?;

let proposer_index = *epoch_proposers
.proposers
.get(column_slot.as_usize() % T::EthSpec::slots_per_epoch() as usize)
.ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;

let fork = epoch_proposers.fork;

// Signature verify the signed block header.
let signature_is_valid = {
Expand Down
2 changes: 0 additions & 2 deletions common/logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ test_logger = [] # Print log output to stderr when running tests instead of drop
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
logroller = { workspace = true }
metrics = { workspace = true }
once_cell = "1.17.1"
parking_lot = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = [ "time" ] }
Expand Down
Loading