Skip to content

Commit 2d98916

Browse files
committed
Rewrite to avoid lock contention during computation.
1 parent 5fa537b commit 2d98916

File tree

2 files changed

+70
-91
lines changed

2 files changed

+70
-91
lines changed

beacon_node/beacon_chain/src/beacon_proposer_cache.rs

Lines changed: 22 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use smallvec::SmallVec;
1717
use state_processing::state_advance::partial_state_advance;
1818
use std::cmp::Ordering;
1919
use std::num::NonZeroUsize;
20+
use std::sync::Arc;
2021
use types::non_zero_usize::new_non_zero_usize;
2122
use types::{
2223
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
@@ -41,21 +42,21 @@ pub struct Proposer {
4142
/// their signatures.
4243
pub struct EpochBlockProposers {
4344
/// The epoch to which the proposers pertain.
44-
epoch: Epoch,
45+
pub epoch: Epoch,
4546
/// The fork that should be used to verify proposer signatures.
46-
fork: Fork,
47+
pub fork: Fork,
4748
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
4849
/// in that epoch.
4950
///
5051
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
51-
proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
52+
pub proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
5253
}
5354

5455
/// A cache to store the proposers for some epoch.
5556
///
5657
/// See the module-level documentation for more information.
5758
pub struct BeaconProposerCache {
58-
cache: LruCache<(Epoch, Hash256), OnceCell<EpochBlockProposers>>,
59+
cache: LruCache<(Epoch, Hash256), Arc<OnceCell<EpochBlockProposers>>>,
5960
}
6061

6162
impl Default for BeaconProposerCache {
@@ -111,6 +112,17 @@ impl BeaconProposerCache {
111112
.and_then(|cache_once_cell| cache_once_cell.get().map(|proposers| &proposers.proposers))
112113
}
113114

115+
pub fn get_or_insert_key(
116+
&mut self,
117+
epoch: Epoch,
118+
shuffling_decision_block: Hash256,
119+
) -> Arc<OnceCell<EpochBlockProposers>> {
120+
let key = (epoch, shuffling_decision_block);
121+
self.cache
122+
.get_or_insert(key, || Arc::new(OnceCell::new()))
123+
.clone()
124+
}
125+
114126
/// Insert the proposers into the cache.
115127
///
116128
/// See `Self::get` for a description of `shuffling_decision_block`.
@@ -125,61 +137,16 @@ impl BeaconProposerCache {
125137
) -> Result<(), BeaconStateError> {
126138
let key = (epoch, shuffling_decision_block);
127139
if !self.cache.contains(&key) {
128-
self.cache.put(
129-
key,
130-
OnceCell::with_value(EpochBlockProposers {
131-
epoch,
132-
fork,
133-
proposers: proposers.into(),
134-
}),
135-
);
136-
}
137-
138-
Ok(())
139-
}
140-
141-
pub fn get_slot_or_insert_with<E: EthSpec, F, Err: BlockBlobError>(
142-
&mut self,
143-
shuffling_decision_block: Hash256,
144-
slot: Slot,
145-
compute_proposers_and_fork_fn: F,
146-
) -> Result<Option<Proposer>, Err>
147-
where
148-
F: FnOnce() -> Result<(Vec<usize>, Fork), Err>,
149-
{
150-
let epoch = slot.epoch(E::slots_per_epoch());
151-
let (proposers, fork) = self.get_epoch_or_insert_with(
152-
shuffling_decision_block,
153-
epoch,
154-
compute_proposers_and_fork_fn,
155-
)?;
156-
157-
Ok(proposers
158-
.get(slot.as_usize() % E::SlotsPerEpoch::to_usize())
159-
.map(|&index| Proposer { index, fork }))
160-
}
161-
162-
pub fn get_epoch_or_insert_with<F, Err: BlockBlobError>(
163-
&mut self,
164-
shuffling_decision_block: Hash256,
165-
epoch: Epoch,
166-
compute_proposers_and_fork_fn: F,
167-
) -> Result<(&SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>, Fork), Err>
168-
where
169-
F: FnOnce() -> Result<(Vec<usize>, Fork), Err>,
170-
{
171-
let key = (epoch, shuffling_decision_block);
172-
let once_cell = self.cache.get_or_insert_mut(key, OnceCell::new);
173-
let epoch_block_proposers = once_cell.get_or_try_init(|| {
174-
let (proposers, fork) = compute_proposers_and_fork_fn()?;
175-
Ok::<EpochBlockProposers, Err>(EpochBlockProposers {
140+
let epoch_proposers = EpochBlockProposers {
176141
epoch,
177142
fork,
178143
proposers: proposers.into(),
179-
})
180-
})?;
144+
};
145+
self.cache
146+
.put(key, Arc::new(OnceCell::with_value(epoch_proposers)));
147+
}
181148

182-
Ok((&epoch_block_proposers.proposers, epoch_block_proposers.fork))
149+
Ok(())
183150
}
184151
}
185152

beacon_node/beacon_chain/src/data_column_verification.rs

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::beacon_proposer_cache::Proposer;
1+
use crate::beacon_proposer_cache::{EpochBlockProposers, Proposer};
22
use crate::block_verification::{
33
cheap_state_advance_to_obtain_committees, get_validator_pubkey_cache, process_block_slash_info,
44
BlockSlashInfo,
@@ -20,7 +20,7 @@ use tracing::debug;
2020
use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier};
2121
use types::{
2222
BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256,
23-
RuntimeVariableList, SignedBeaconBlockHeader, Slot,
23+
RuntimeVariableList, SignedBeaconBlockHeader, Slot, Unsigned,
2424
};
2525

2626
/// An error occurred while validating a gossip data column.
@@ -572,45 +572,57 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
572572
parent_block.root
573573
};
574574

575-
let Proposer {
576-
index: proposer_index,
577-
fork,
578-
} = chain
575+
// We lock the cache briefly to get or insert a OnceCell, then drop the lock
576+
// before doing proposer shuffling calculation via `OnceCell::get_or_try_init`. This avoids
577+
// holding the lock during the computation, while still ensuring the result is cached and
578+
// initialised only once.
579+
//
580+
// This approach unfortunately exposes the cache internals (`OnceCell` & `EpochBlockProposers`)
581+
// as a trade-off for avoiding lock contention.
582+
let epoch_proposers_cell = chain
579583
.beacon_proposer_cache
580584
.lock()
581-
.get_slot_or_insert_with::<E, _, GossipDataColumnError>(
582-
proposer_shuffling_root,
585+
.get_or_insert_key(column_epoch, proposer_shuffling_root);
586+
587+
let epoch_proposers = epoch_proposers_cell.get_or_try_init(move || {
588+
debug!(
589+
%block_root,
590+
index = %column_index,
591+
"Proposer shuffling cache miss for column verification"
592+
);
593+
let (parent_state_root, mut parent_state) = chain
594+
.store
595+
.get_advanced_hot_state(block_parent_root, column_slot, parent_block.state_root)
596+
.map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))?
597+
.ok_or_else(|| {
598+
BeaconChainError::DBInconsistent(format!(
599+
"Missing state for parent block {block_parent_root:?}",
600+
))
601+
})?;
602+
603+
let state = cheap_state_advance_to_obtain_committees::<_, GossipDataColumnError>(
604+
&mut parent_state,
605+
Some(parent_state_root),
583606
column_slot,
584-
move || {
585-
debug!(
586-
%block_root,
587-
index = %column_index,
588-
"Proposer shuffling cache miss for column verification"
589-
);
590-
let (parent_state_root, mut parent_state) = chain
591-
.store
592-
.get_advanced_hot_state(block_parent_root, column_slot, parent_block.state_root)
593-
.map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))?
594-
.ok_or_else(|| {
595-
BeaconChainError::DBInconsistent(format!(
596-
"Missing state for parent block {block_parent_root:?}",
597-
))
598-
})?;
599-
600-
let state = cheap_state_advance_to_obtain_committees::<_, GossipDataColumnError>(
601-
&mut parent_state,
602-
Some(parent_state_root),
603-
column_slot,
604-
&chain.spec,
605-
)?;
606-
607-
let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
608-
// Prime the proposer shuffling cache with the newly-learned value.
609-
Ok((proposers, state.fork()))
610-
},
611-
)?
607+
&chain.spec,
608+
)?;
609+
610+
let proposers = state.get_beacon_proposer_indices(&chain.spec)?;
611+
// Prime the proposer shuffling cache with the newly-learned value.
612+
Ok::<EpochBlockProposers, Err>(EpochBlockProposers {
613+
epoch: column_epoch,
614+
fork: state.fork(),
615+
proposers: proposers.into(),
616+
})
617+
})?;
618+
619+
let proposer_index = epoch_proposers
620+
.proposers
621+
.get(column_slot.as_usize() % E::SlotsPerEpoch::to_usize())
612622
.ok_or_else(|| BeaconChainError::NoProposerForSlot(column_slot))?;
613623

624+
let fork = epoch_proposers.fork;
625+
614626
// Signature verify the signed block header.
615627
let signature_is_valid = {
616628
let pubkey_cache = get_validator_pubkey_cache(chain)

0 commit comments

Comments
 (0)