Skip to content

Commit 2f456ff

Browse files
committed
Fix regression in DB write atomicity (#3931)
## Issue Addressed Fix a bug introduced by #3696. The bug is not expected to occur frequently, so releasing this PR is non-urgent. ## Proposed Changes * Add a variant to `StoreOp` that allows a raw KV operation to be passed around. * Return to using `self.store.do_atomically` rather than `self.store.hot_db.do_atomically`. This streamlines the write back into a single call and makes our auto-revert work again. * Prevent `import_block_update_shuffling_cache` from failing block import. This is an outstanding bug from before v3.4.0 which may have contributed to some random unexplained database corruption. ## Additional Info In #3696 I split the database write into two calls, one to convert the `StoreOp`s to `KeyValueStoreOp`s and one to write them. This had the unfortunate side-effect of damaging our atomicity guarantees in case of a write error. If the first call failed, we would be left with the block in fork choice but not on-disk (or the snapshot cache), which would prevent us from processing any descendant blocks. On `unstable` the first call is very unlikely to fail unless the disk is full, but on `tree-states` the conversion is more involved and a user reported database corruption after it failed in a way that should have been recoverable. Additionally, as @emhane observed, #3696 also inadvertently removed the import of the new block into the block cache. Although this seems like it could have negatively impacted performance, there are several mitigating factors: - For regular block processing we should almost always load the parent block (and state) from the snapshot cache. - We often load blinded blocks, which bypass the block cache anyway. - Metrics show no noticeable increase in the block cache miss rate with v3.4.0. However, I expect the block cache _will_ be useful again in `tree-states`, so it is restored to use by this PR.
1 parent 84843d6 commit 2f456ff

File tree

4 files changed

+47
-17
lines changed

4 files changed

+47
-17
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2714,7 +2714,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
27142714
// is so we don't have to think about lock ordering with respect to the fork choice lock.
27152715
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
27162716
// would be difficult to check that they all lock fork choice first.
2717-
let mut kv_store_ops = self
2717+
let mut ops = self
27182718
.validator_pubkey_cache
27192719
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
27202720
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
@@ -2816,9 +2816,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
28162816
// ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
28172817
// Most blocks are now capable of being attested to thanks to the `early_attester_cache`
28182818
// cache above. Resume non-essential processing.
2819+
//
2820+
// It is important NOT to return errors here before the database commit, because the block
2821+
// has already been added to fork choice and the database would be left in an inconsistent
2822+
// state if we returned early without committing. In other words, an error here would
2823+
// corrupt the node's database permanently.
28192824
// -----------------------------------------------------------------------------------------
28202825

2821-
self.import_block_update_shuffling_cache(block_root, &mut state)?;
2826+
self.import_block_update_shuffling_cache(block_root, &mut state);
28222827
self.import_block_observe_attestations(
28232828
block,
28242829
&state,
@@ -2841,17 +2846,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
28412846
// If the write fails, revert fork choice to the version from disk, else we can
28422847
// end up with blocks in fork choice that are missing from disk.
28432848
// See https://github.com/sigp/lighthouse/issues/2028
2844-
let mut ops: Vec<_> = confirmed_state_roots
2845-
.into_iter()
2846-
.map(StoreOp::DeleteStateTemporaryFlag)
2847-
.collect();
2849+
ops.extend(
2850+
confirmed_state_roots
2851+
.into_iter()
2852+
.map(StoreOp::DeleteStateTemporaryFlag),
2853+
);
28482854
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
28492855
ops.push(StoreOp::PutState(block.state_root(), &state));
28502856
let txn_lock = self.store.hot_db.begin_rw_transaction();
28512857

2852-
kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?);
2853-
2854-
if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
2858+
if let Err(e) = self.store.do_atomically(ops) {
28552859
error!(
28562860
self.log,
28572861
"Database write failed!";
@@ -3280,13 +3284,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32803284
}
32813285
}
32823286

3287+
// For the current and next epoch of this state, ensure we have the shuffling from this
3288+
// block in our cache.
32833289
fn import_block_update_shuffling_cache(
32843290
&self,
32853291
block_root: Hash256,
32863292
state: &mut BeaconState<T::EthSpec>,
3293+
) {
3294+
if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) {
3295+
warn!(
3296+
self.log,
3297+
"Failed to prime shuffling cache";
3298+
"error" => ?e
3299+
);
3300+
}
3301+
}
3302+
3303+
fn import_block_update_shuffling_cache_fallible(
3304+
&self,
3305+
block_root: Hash256,
3306+
state: &mut BeaconState<T::EthSpec>,
32873307
) -> Result<(), BlockError<T::EthSpec>> {
3288-
// For the current and next epoch of this state, ensure we have the shuffling from this
3289-
// block in our cache.
32903308
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
32913309
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;
32923310

beacon_node/beacon_chain/src/validator_pubkey_cache.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
44
use std::collections::HashMap;
55
use std::convert::TryInto;
66
use std::marker::PhantomData;
7-
use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem};
7+
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
88
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
99

1010
/// Provides a mapping of `validator_index -> validator_publickey`.
@@ -38,7 +38,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
3838
};
3939

4040
let store_ops = cache.import_new_pubkeys(state)?;
41-
store.hot_db.do_atomically(store_ops)?;
41+
store.do_atomically(store_ops)?;
4242

4343
Ok(cache)
4444
}
@@ -79,7 +79,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
7979
pub fn import_new_pubkeys(
8080
&mut self,
8181
state: &BeaconState<T::EthSpec>,
82-
) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> {
82+
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError> {
8383
if state.validators().len() > self.pubkeys.len() {
8484
self.import(
8585
state.validators()[self.pubkeys.len()..]
@@ -92,7 +92,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
9292
}
9393

9494
/// Adds zero or more validators to `self`.
95-
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, BeaconChainError>
95+
fn import<I>(
96+
&mut self,
97+
validator_keys: I,
98+
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError>
9699
where
97100
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
98101
{
@@ -112,7 +115,9 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
112115
// It will be committed atomically when the block that introduced it is written to disk.
113116
// Notably it is NOT written while the write lock on the cache is held.
114117
// See: https://github.com/sigp/lighthouse/issues/2327
115-
store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)));
118+
store_ops.push(StoreOp::KeyValueOp(
119+
DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)),
120+
));
116121

117122
self.pubkeys.push(
118123
(&pubkey)
@@ -294,7 +299,7 @@ mod test {
294299
let ops = cache
295300
.import_new_pubkeys(&state)
296301
.expect("should import pubkeys");
297-
store.hot_db.do_atomically(ops).unwrap();
302+
store.do_atomically(ops).unwrap();
298303
check_cache_get(&cache, &keypairs[..]);
299304
drop(cache);
300305

beacon_node/store/src/hot_cold_store.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
727727
let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes());
728728
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
729729
}
730+
731+
StoreOp::KeyValueOp(kv_op) => {
732+
key_value_batch.push(kv_op);
733+
}
730734
}
731735
}
732736
Ok(key_value_batch)
@@ -758,6 +762,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
758762
StoreOp::DeleteState(_, _) => (),
759763

760764
StoreOp::DeleteExecutionPayload(_) => (),
765+
766+
StoreOp::KeyValueOp(_) => (),
761767
}
762768
}
763769

beacon_node/store/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ pub enum StoreOp<'a, E: EthSpec> {
161161
DeleteBlock(Hash256),
162162
DeleteState(Hash256, Option<Slot>),
163163
DeleteExecutionPayload(Hash256),
164+
KeyValueOp(KeyValueStoreOp),
164165
}
165166

166167
/// A unique column identifier.

0 commit comments

Comments
 (0)