From 2bd784ef688fba8128ccf9ee20e33b41097fefcb Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 16 Sep 2022 17:32:22 +1000 Subject: [PATCH 1/4] Work in progress block separation --- Cargo.lock | 42 +++++++ beacon_node/beacon_chain/src/beacon_chain.rs | 10 +- .../src/beacon_fork_choice_store.rs | 2 +- beacon_node/beacon_chain/src/builder.rs | 4 +- .../beacon_chain/src/canonical_head.rs | 4 +- beacon_node/beacon_chain/src/fork_revert.rs | 2 +- .../beacon_chain/src/historical_blocks.rs | 8 +- beacon_node/beacon_chain/src/migrate.rs | 2 +- .../src/pre_finalization_cache.rs | 2 +- .../src/schema_change/migration_schema_v12.rs | 2 +- .../src/schema_change/migration_schema_v9.rs | 2 +- beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/config.rs | 6 + beacon_node/store/src/errors.rs | 1 + beacon_node/store/src/hot_cold_store.rs | 103 +++++++++++++++++- beacon_node/store/src/impls.rs | 1 + .../store/src/impls/frozen_block_slot.rs | 19 ++++ beacon_node/store/src/iter.rs | 7 +- beacon_node/store/src/lib.rs | 11 ++ beacon_node/store/src/metadata.rs | 2 +- beacon_node/store/src/reconstruct.rs | 2 +- 21 files changed, 204 insertions(+), 29 deletions(-) create mode 100644 beacon_node/store/src/impls/frozen_block_slot.rs diff --git a/Cargo.lock b/Cargo.lock index 58c9ec2a728..58f077e35ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -733,6 +733,9 @@ name = "cc" version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +dependencies = [ + "jobserver", +] [[package]] name = "cexpr" @@ -3023,6 +3026,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.59" @@ -6338,6 +6350,7 @@ dependencies = [ "strum", "tempfile", "types", + "zstd", ] [[package]] @@ -7919,3 +7932,32 @@ dependencies = [ "thiserror", "time 0.1.44", ] + +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" +dependencies = [ + "cc", + "libc", +] diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 077b425c073..5180f3bae51 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -693,7 +693,9 @@ impl BeaconChain { let root = self.block_root_at_slot(request_slot, skips)?; if let Some(block_root) = root { - Ok(self.store.get_blinded_block(&block_root)?) + Ok(self + .store + .get_blinded_block(&block_root, Some(request_slot))?) } else { Ok(None) } @@ -919,7 +921,7 @@ impl BeaconChain { ) -> Result>, Error> { // Load block from database, returning immediately if we have the full block w payload // stored. - let blinded_block = match self.store.try_get_full_block(block_root)? { + let blinded_block = match self.store.try_get_full_block(block_root, None)? { Some(DatabaseBlock::Full(block)) => return Ok(Some(block)), Some(DatabaseBlock::Blinded(block)) => block, None => return Ok(None), @@ -975,7 +977,7 @@ impl BeaconChain { &self, block_root: &Hash256, ) -> Result>, Error> { - Ok(self.store.get_blinded_block(block_root)?) + Ok(self.store.get_blinded_block(block_root, None)?) } /// Returns the state at the given root, if any. @@ -4629,7 +4631,7 @@ impl BeaconChain { let beacon_block = self .store - .get_blinded_block(&beacon_block_root)? + .get_blinded_block(&beacon_block_root, None)? .ok_or_else(|| { Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) })?; diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 4f6003fda1b..55502419c04 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -322,7 +322,7 @@ where metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES); let justified_block = self .store - .get_blinded_block(&self.justified_checkpoint.root) + .get_blinded_block(&self.justified_checkpoint.root, None) .map_err(Error::FailedToReadBlock)? .ok_or(Error::MissingBlock(self.justified_checkpoint.root))? .deconstruct() diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a578629b690..424013907f5 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -256,7 +256,7 @@ where .ok_or("Fork choice not found in store")?; let genesis_block = store - .get_blinded_block(&chain.genesis_block_root) + .get_blinded_block(&chain.genesis_block_root, Some(Slot::new(0))) .map_err(|e| descriptive_db_error("genesis block", &e))? .ok_or("Genesis block not found in store")?; let genesis_state = store @@ -618,7 +618,7 @@ where // Try to decode the head block according to the current fork, if that fails, try // to backtrack to before the most recent fork. let (head_block_root, head_block, head_reverted) = - match store.get_full_block(&initial_head_block_root) { + match store.get_full_block(&initial_head_block_root, None) { Ok(Some(block)) => (initial_head_block_root, block, false), Ok(None) => return Err("Head block not found in store".into()), Err(StoreError::SszDecodeError(_)) => { diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index c9bd6db0e67..b482b99b357 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -269,7 +269,7 @@ impl CanonicalHead { let fork_choice_view = fork_choice.cached_fork_choice_view(); let beacon_block_root = fork_choice_view.head_block_root; let beacon_block = store - .get_full_block(&beacon_block_root)? + .get_full_block(&beacon_block_root, None)? .ok_or(Error::MissingBeaconBlock(beacon_block_root))?; let beacon_state_root = beacon_block.state_root(); let beacon_state = store @@ -639,7 +639,7 @@ impl BeaconChain { .unwrap_or_else(|| { let beacon_block = self .store - .get_full_block(&new_view.head_block_root)? + .get_full_block(&new_view.head_block_root, None)? .ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?; let beacon_state_root = beacon_block.state_root(); diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index 654b2713b1c..8548650ce57 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -107,7 +107,7 @@ pub fn reset_fork_choice_to_finalization, Cold: It let finalized_checkpoint = head_state.finalized_checkpoint(); let finalized_block_root = finalized_checkpoint.root; let finalized_block = store - .get_full_block(&finalized_block_root) + .get_full_block(&finalized_block_root, None) .map_err(|e| format!("Error loading finalized block: {:?}", e))? .ok_or_else(|| { format!( diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index cc45a6bb9a9..f6c10958f28 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -93,7 +93,6 @@ impl BeaconChain { ChunkWriter::::new(&self.store.cold_db, prev_block_slot.as_usize())?; let mut cold_batch = Vec::with_capacity(blocks.len()); - let mut hot_batch = Vec::with_capacity(blocks.len()); for block in blocks_to_import.iter().rev() { // Check chain integrity. @@ -109,7 +108,7 @@ impl BeaconChain { // Store block in the hot database without payload. self.store - .blinded_block_as_kv_store_ops(&block_root, block, &mut hot_batch); + .blinded_block_as_cold_kv_store_ops(&block_root, block, &mut cold_batch); // Store block roots, including at all skip slots in the freezer DB. for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { @@ -177,10 +176,7 @@ impl BeaconChain { drop(verify_timer); drop(sig_timer); - // Write the I/O batches to disk, writing the blocks themselves first, as it's better - // for the hot DB to contain extra blocks than for the cold DB to point to blocks that - // do not exist. - self.store.hot_db.do_atomically(hot_batch)?; + // Write the I/O batch to disk. self.store.cold_db.do_atomically(cold_batch)?; // Update the anchor. diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 1c0d9c4ed31..bd329a09636 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -424,7 +424,7 @@ impl, Cold: ItemStore> BackgroundMigrator block.state_root(), Ok(None) => { return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into()) diff --git a/beacon_node/beacon_chain/src/pre_finalization_cache.rs b/beacon_node/beacon_chain/src/pre_finalization_cache.rs index 112394bb184..ca957af2135 100644 --- a/beacon_node/beacon_chain/src/pre_finalization_cache.rs +++ b/beacon_node/beacon_chain/src/pre_finalization_cache.rs @@ -71,7 +71,7 @@ impl BeaconChain { } // 2. Check on disk. - if self.store.get_blinded_block(&block_root)?.is_some() { + if self.store.get_blinded_block(&block_root, None)?.is_some() { cache.block_roots.put(block_root, ()); return Ok(true); } diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs index bb72b28c0ec..c29c881ffaf 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v12.rs @@ -39,7 +39,7 @@ pub fn upgrade_to_v12( .unrealized_justified_checkpoint .root; let justified_block = db - .get_blinded_block(&justified_block_root)? + .get_blinded_block(&justified_block_root, None)? .ok_or_else(|| { Error::SchemaMigrationError(format!( "unrealized justified block missing for migration: {justified_block_root:?}", diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs index e2c48d5c89d..2135e5f689b 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs @@ -91,7 +91,7 @@ pub fn upgrade_to_v9( Ok(None) => return Err(Error::BlockNotFound(block_root)), // There was an error reading a pre-v9 block. Try reading it as a post-v9 block. Err(_) => { - if db.try_get_full_block(&block_root)?.is_some() { + if db.try_get_full_block(&block_root, None)?.is_some() { // The block is present as a post-v9 block, assume that it was already // correctly migrated. continue; diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 20ae37b3b14..9cabc1c68be 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -26,3 +26,4 @@ lru = "0.7.1" sloggers = { version = "2.1.1", features = ["json"] } directory = { path = "../../common/directory" } strum = { version = "0.24.0", features = ["derive"] } +zstd = "0.11.0" diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 4268ec2e915..81ea1990b0c 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -21,12 +21,16 @@ pub struct StoreConfig { pub compact_on_init: bool, /// Whether to compact the database during database pruning. pub compact_on_prune: bool, + /// Whether to store finalized blocks in the freezer database. + pub separate_blocks: bool, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] pub struct OnDiskStoreConfig { pub slots_per_restore_point: u64, + // FIXME(sproul): schema migration + pub separate_blocks: bool, } #[derive(Debug, Clone)] @@ -43,6 +47,7 @@ impl Default for StoreConfig { block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, compact_on_init: false, compact_on_prune: true, + separate_blocks: true, } } } @@ -51,6 +56,7 @@ impl StoreConfig { pub fn as_disk_config(&self) -> OnDiskStoreConfig { OnDiskStoreConfig { slots_per_restore_point: self.slots_per_restore_point, + separate_blocks: self.separate_blocks, } } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 30ee66074f8..ad6d3100140 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -41,6 +41,7 @@ pub enum Error { computed: Hash256, }, BlockReplayError(BlockReplayError), + Compression(std::io::Error), AddPayloadLogicError, ResyncRequiredForExecutionPayloadSeparation, SlotClockUnavailableForMigration, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c4b4a64a057..57b5eaf27fa 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -6,7 +6,10 @@ use crate::config::{ PREV_DEFAULT_SLOTS_PER_RESTORE_POINT, }; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; -use crate::impls::beacon_state::{get_full_state, store_full_state}; +use crate::impls::{ + beacon_state::{get_full_state, store_full_state}, + frozen_block_slot::FrozenBlockSlot, +}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; @@ -33,11 +36,13 @@ use state_processing::{ }; use std::cmp::min; use std::convert::TryInto; +use std::io::{Read, Write}; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use std::time::Duration; use types::*; +use zstd::{Decoder, Encoder}; /// On-disk database that stores finalized states efficiently. /// @@ -92,6 +97,7 @@ pub enum HotColdDBError { MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, + MissingFrozenBlockSlot(Hash256), HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), @@ -318,6 +324,7 @@ impl, Cold: ItemStore> HotColdDB pub fn try_get_full_block( &self, block_root: &Hash256, + slot: Option, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); @@ -328,7 +335,7 @@ impl, Cold: ItemStore> HotColdDB } // Load the blinded block. - let blinded_block = match self.get_blinded_block(block_root)? { + let blinded_block = match self.get_blinded_block(block_root, slot)? { Some(block) => block, None => return Ok(None), }; @@ -360,8 +367,9 @@ impl, Cold: ItemStore> HotColdDB pub fn get_full_block( &self, block_root: &Hash256, + slot: Option, ) -> Result>, Error> { - match self.try_get_full_block(block_root)? { + match self.try_get_full_block(block_root, slot)? { Some(DatabaseBlock::Full(block)) => Ok(Some(block)), Some(DatabaseBlock::Blinded(block)) => Err( HotColdDBError::MissingFullBlockExecutionPayloadPruned(*block_root, block.slot()) @@ -399,12 +407,98 @@ impl, Cold: ItemStore> HotColdDB pub fn get_blinded_block( &self, block_root: &Hash256, - ) -> Result>>, Error> { + slot: Option, + ) -> Result>, Error> { + if let Some(slot) = slot { + if slot < self.get_split_slot() { + // To the freezer DB. + self.get_cold_blinded_block_by_slot(slot) + } else { + self.get_hot_blinded_block(block_root) + } + } else { + match self.get_hot_blinded_block(block_root)? { + Some(block) => Ok(Some(block)), + None => self.get_cold_blinded_block_by_root(block_root), + } + } + } + + pub fn get_hot_blinded_block( + &self, + block_root: &Hash256, + ) -> Result>, Error> { self.get_block_with(block_root, |bytes| { SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec) }) } + pub fn get_cold_blinded_block_by_root( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + // Load slot. + if let Some(FrozenBlockSlot(block_slot)) = self.cold_db.get(block_root)? { + self.get_cold_blinded_block_by_slot(block_slot) + } else { + Ok(None) + } + } + + pub fn get_cold_blinded_block_by_slot( + &self, + slot: Slot, + ) -> Result>, Error> { + let bytes = if let Some(bytes) = self.cold_db.get_bytes( + DBColumn::BeaconBlockFrozen.into(), + &slot.as_u64().to_be_bytes(), + )? { + bytes + } else { + return Ok(None); + }; + + // FIXME(sproul): dodgy compression factor estimation + let mut ssz_bytes = Vec::with_capacity(2 * bytes.len()); + let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::Compression)?; + Ok(Some(SignedBeaconBlock::from_ssz_bytes( + &ssz_bytes, &self.spec, + )?)) + } + + pub fn blinded_block_as_cold_kv_store_ops( + &self, + block_root: &Hash256, + block: &SignedBlindedBeaconBlock, + kv_store_ops: &mut Vec, + ) -> Result<(), Error> { + // Write the block root to slot mapping. + let slot = block.slot(); + kv_store_ops.push(FrozenBlockSlot(slot).as_kv_store_op(*block_root)); + + // Write the block keyed by slot. + let db_key = get_key_for_col( + DBColumn::BeaconBlockFrozen.into(), + &slot.as_u64().to_be_bytes(), + ); + + // FIXME(sproul): fix compression estimate and level + let compression_level = 3; + let ssz_bytes = block.as_ssz_bytes(); + let mut compressed_value = Vec::with_capacity(ssz_bytes.len() / 2); + let mut encoder = + Encoder::new(&mut compressed_value, compression_level).map_err(Error::Compression)?; + encoder.write_all(&ssz_bytes).map_err(Error::Compression)?; + encoder.finish().map_err(Error::Compression)?; + + kv_store_ops.push(KeyValueStoreOp::PutKeyValue(db_key, compressed_value)); + + Ok(()) + } + /// Fetch a block from the store, ignoring which fork variant it *should* be for. pub fn get_block_any_variant>( &self, @@ -1455,6 +1549,7 @@ pub fn migrate_database, Cold: ItemStore>( } let mut hot_db_ops: Vec> = Vec::new(); + let mut cold_db_block_ops: Vec = vec![]; // 1. Copy all of the states between the head and the split slot, from the hot DB // to the cold DB. diff --git a/beacon_node/store/src/impls.rs b/beacon_node/store/src/impls.rs index 736585a72aa..b2af9a408ef 100644 --- a/beacon_node/store/src/impls.rs +++ b/beacon_node/store/src/impls.rs @@ -1,2 +1,3 @@ pub mod beacon_state; pub mod execution_payload; +pub mod frozen_block_slot; diff --git a/beacon_node/store/src/impls/frozen_block_slot.rs b/beacon_node/store/src/impls/frozen_block_slot.rs new file mode 100644 index 00000000000..67d11b4f081 --- /dev/null +++ b/beacon_node/store/src/impls/frozen_block_slot.rs @@ -0,0 +1,19 @@ +use crate::{DBColumn, Error, StoreItem}; +use ssz::{Decode, Encode}; +use types::Slot; + +pub struct FrozenBlockSlot(pub Slot); + +impl StoreItem for FrozenBlockSlot { + fn db_column() -> DBColumn { + DBColumn::BeaconBlock + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(FrozenBlockSlot(Slot::from_ssz_bytes(bytes)?)) + } +} diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 07c99e5a4ef..b073a76e17d 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -189,7 +189,7 @@ impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, T, block_hash: Hash256, ) -> Result { let block = store - .get_blinded_block(&block_hash)? + .get_blinded_block(&block_hash, None)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?; let state = store .get_state(&block.state_root(), Some(block.slot()))? @@ -286,7 +286,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> let block = if self.decode_any_variant { self.store.get_block_any_variant(&block_root) } else { - self.store.get_blinded_block(&block_root) + self.store.get_blinded_block(&block_root, None) }? .ok_or(Error::BlockNotFound(block_root))?; self.next_block_root = block.message().parent_root(); @@ -329,7 +329,8 @@ impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, T, fn do_next(&mut self) -> Result>>, Error> { if let Some(result) = self.roots.next() { let (root, _slot) = result?; - self.roots.inner.store.get_blinded_block(&root) + // Don't use slot hint here as it could be a skipped slot. + self.roots.inner.store.get_blinded_block(&root, None) } else { Ok(None) } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 75aeca058b5..40226ade030 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -169,8 +169,19 @@ pub enum DBColumn { /// For data related to the database itself. #[strum(serialize = "bma")] BeaconMeta, + /// Data related to blocks. + /// + /// - Key: `Hash256` block root. + /// - Value in hot DB: SSZ-encoded blinded block. + /// - Value in cold DB: 8-byte slot of block. #[strum(serialize = "blk")] BeaconBlock, + /// Frozen beacon blocks. + /// + /// - Key: 8-byte slot. + /// - Value: ZSTD-compressed SSZ-encoded blinded block. + #[strum(serialize = "bbf")] + BeaconBlockFrozen, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). #[strum(serialize = "ste")] BeaconState, diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 4f35c4c0728..9ca28ecc8cb 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(12); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(8000); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 7db2652f2cd..7c654d3df56 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -76,7 +76,7 @@ where None } else { Some( - self.get_blinded_block(&block_root)? + self.get_blinded_block(&block_root, Some(slot))? .ok_or(Error::BlockNotFound(block_root))?, ) }; From 0c75da5a01c12b392e9b5c65a9191c2fd27989b1 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 16 Sep 2022 17:36:06 +1000 Subject: [PATCH 2/4] Squash merge of 3565 Squashed commit of the following: commit a4960ebfd793582f811690216c68b53698e722bf Author: Michael Sproul Date: Mon Sep 12 12:10:23 2022 +1000 Clippy commit b28e8d0848d1d347f9b4cdd1de9fe26954ddc8dc Author: Michael Sproul Date: Mon Sep 12 11:41:45 2022 +1000 Add flag to disable prune on startup commit de775d6aa5786a2b6bf55969fcb0cc6d3dc8fb7a Author: Michael Sproul Date: Mon Sep 12 11:19:21 2022 +1000 Fix and update beacon chain tests commit 2289b20bca69ef619fd34b6e658f8f6e99f9180d Author: Michael Sproul Date: Fri Sep 9 17:40:21 2022 +1000 Implement DB manager command commit d5adc2ebc52fee73fed0bd23d0eeec0827bd3a96 Author: Michael Sproul Date: Fri Sep 9 12:56:27 2022 +1000 Implement on-demand pruning operation commit 69d54741c15e92a0a8b2bf1e2d0b82e6d19a4474 Author: Michael Sproul Date: Thu Sep 8 16:25:04 2022 +1000 Delete finalized exec payloads while running --- beacon_node/beacon_chain/src/builder.rs | 7 ++ .../beacon_chain/tests/block_verification.rs | 33 +++--- beacon_node/beacon_chain/tests/store_tests.rs | 29 ++++- beacon_node/src/cli.rs | 7 ++ beacon_node/src/config.rs | 6 + beacon_node/store/src/config.rs | 3 + beacon_node/store/src/hot_cold_store.rs | 110 +++++++++++++++++- database_manager/src/lib.rs | 32 +++++ lighthouse/tests/beacon_node.rs | 13 +++ 9 files changed, 211 insertions(+), 29 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 424013907f5..2c5a68388ef 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -266,6 +266,13 @@ where self.genesis_time = Some(genesis_state.genesis_time()); + // Prune finalized execution payloads. + if store.get_config().prune_payloads_on_init { + store + .try_prune_execution_payloads(false) + .map_err(|e| format!("Error pruning execution payloads: {e:?}"))?; + } + self.op_pool = Some( store .get_item::>(&OP_POOL_DB_KEY) diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index c2283321cbe..17c84bd6971 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -41,28 +41,27 @@ async fn get_chain_segment() -> Vec> { ) .await; - harness + let mut segment = Vec::with_capacity(CHAIN_SEGMENT_LENGTH); + for snapshot in harness .chain .chain_dump() .expect("should dump chain") .into_iter() - .map(|snapshot| { - let full_block = harness - .chain - .store - .make_full_block( - &snapshot.beacon_block_root, - snapshot.beacon_block.as_ref().clone(), - ) - .unwrap(); - BeaconSnapshot { - beacon_block_root: snapshot.beacon_block_root, - beacon_block: Arc::new(full_block), - beacon_state: snapshot.beacon_state, - } - }) .skip(1) - .collect() + { + let full_block = harness + .chain + .get_block(&snapshot.beacon_block_root) + .await + .unwrap() + .unwrap(); + segment.push(BeaconSnapshot { + beacon_block_root: snapshot.beacon_block_root, + beacon_block: Arc::new(full_block), + beacon_state: snapshot.beacon_state, + }); + } + segment } fn get_harness(validator_count: usize) -> BeaconChainHarness> { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index afd97750a6f..b85ff50efb7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2114,14 +2114,16 @@ async fn weak_subjectivity_sync() { assert_eq!(new_blocks[0].beacon_block.slot(), wss_slot + 1); for snapshot in new_blocks { - let block = &snapshot.beacon_block; let full_block = harness .chain - .store - .make_full_block(&snapshot.beacon_block_root, block.as_ref().clone()) + .get_block(&snapshot.beacon_block_root) + .await + .unwrap() .unwrap(); + let slot = full_block.slot(); + let state_root = full_block.state_root(); - beacon_chain.slot_clock.set_slot(block.slot().as_u64()); + beacon_chain.slot_clock.set_slot(slot.as_u64()); beacon_chain .process_block(Arc::new(full_block), CountUnrealized::True) .await @@ -2129,10 +2131,9 @@ async fn weak_subjectivity_sync() { beacon_chain.recompute_head_at_current_slot().await; // Check that the new block's state can be loaded correctly. - let state_root = block.state_root(); let mut state = beacon_chain .store - .get_state(&state_root, Some(block.slot())) + .get_state(&state_root, Some(slot)) .unwrap() .unwrap(); assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); @@ -2583,6 +2584,7 @@ fn check_split_slot(harness: &TestHarness, store: Arc, L /// Check that all the states in a chain dump have the correct tree hash. fn check_chain_dump(harness: &TestHarness, expected_len: u64) { let chain_dump = harness.chain.chain_dump().unwrap(); + let split_slot = harness.chain.store.get_split_slot(); assert_eq!(chain_dump.len() as u64, expected_len); @@ -2606,6 +2608,21 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { .slot(), checkpoint.beacon_state.slot() ); + + // Check presence of execution payload on disk. + if harness.chain.spec.bellatrix_fork_epoch.is_some() { + assert_eq!( + harness + .chain + .store + .execution_payload_exists(&checkpoint.beacon_block_root) + .unwrap(), + checkpoint.beacon_block.slot() >= split_slot, + "incorrect payload storage for block at slot {}: {:?}", + checkpoint.beacon_block.slot(), + checkpoint.beacon_block_root, + ); + } } // Check the forwards block roots iterator against the chain dump diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 3c421a1a3f9..d9d5c715b48 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -515,6 +515,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("true") ) + .arg( + Arg::with_name("prune-payloads-on-startup") + .long("prune-payloads-on-startup") + .help("Check for execution payloads to prune on start-up.") + .takes_value(true) + .default_value("true") + ) /* * Misc. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index b57ba02687d..368fce573d2 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -358,6 +358,12 @@ pub fn get_config( .map_err(|_| "auto-compact-db takes a boolean".to_string())?; } + if let Some(prune_payloads_on_init) = + clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")? + { + client_config.store.prune_payloads_on_init = prune_payloads_on_init; + } + /* * Zero-ports * diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 81ea1990b0c..d0a78d53250 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -23,6 +23,8 @@ pub struct StoreConfig { pub compact_on_prune: bool, /// Whether to store finalized blocks in the freezer database. pub separate_blocks: bool, + /// Whether to try pruning execution payloads on initialization. + pub prune_payloads_on_init: bool, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -48,6 +50,7 @@ impl Default for StoreConfig { compact_on_init: false, compact_on_prune: true, separate_blocks: true, + prune_payloads_on_init: true, } } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 57b5eaf27fa..b79b9214317 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -10,7 +10,7 @@ use crate::impls::{ beacon_state::{get_full_state, store_full_state}, frozen_block_slot::FrozenBlockSlot, }; -use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; +use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; @@ -532,6 +532,12 @@ impl, Cold: ItemStore> HotColdDB .ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into()) } + /// Check if the execution payload for a block exists on disk. + pub fn execution_payload_exists(&self, block_root: &Hash256) -> Result { + self.get_item::>(block_root) + .map(|payload| payload.is_some()) + } + /// Determine whether a block exists in the database. pub fn block_exists(&self, block_root: &Hash256) -> Result { self.hot_db @@ -1512,6 +1518,93 @@ impl, Cold: ItemStore> HotColdDB &CompactionTimestamp(compaction_timestamp.as_secs()), ) } + + /// Try to prune all execution payloads, returning early if there is no need to prune. + pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> { + let split = self.get_split_info(); + + if split.slot == 0 { + return Ok(()); + } + + let bellatrix_fork_slot = if let Some(epoch) = self.spec.bellatrix_fork_epoch { + epoch.start_slot(E::slots_per_epoch()) + } else { + return Ok(()); + }; + + // Load the split state so we can backtrack to find execution payloads. + let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( + HotColdDBError::MissingSplitState(split.state_root, split.slot), + )?; + + // The finalized block may or may not have its execution payload stored, depending on + // whether it was at a skipped slot. However for a fully pruned database its parent + // should *always* have been pruned. + let split_parent_block_root = split_state.get_block_root(split.slot - 1)?; + if !self.execution_payload_exists(split_parent_block_root)? && !force { + info!(self.log, "Execution payloads are pruned"); + return Ok(()); + } + + // Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes + // first. + let split_block_root = split_state.get_latest_block_root(split.state_root); + let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot); + + let mut ops = vec![]; + + for res in std::iter::once(Ok((split_block_root, split.slot))) + .chain(BlockRootsIterator::new(self, &split_state)) + { + let (block_root, slot) = match res { + Ok(tuple) => tuple, + Err(e) => { + warn!( + self.log, + "Stopping backtrack early"; + "error" => ?e, + ); + break; + } + }; + + if slot < bellatrix_fork_slot { + info!( + self.log, + "Finished backtrack to Bellatrix fork"; + ); + break; + } + + if self.execution_payload_exists(&block_root)? { + debug!( + self.log, + "Pruning execution payload"; + "slot" => slot, + "block_root" => ?block_root, + ); + ops.push(StoreOp::DeleteExecutionPayload(block_root)); + } + + if Some(slot) == anchor_slot { + info!( + self.log, + "Finished backtrack to anchor state"; + "slot" => slot + ); + break; + } + } + let payloads_pruned = ops.len(); + self.do_atomically(ops)?; + info!( + self.log, + "Execution payload pruning complete"; + "payloads_pruned" => payloads_pruned, + ); + Ok(()) + } } /// Advance the split point of the store, moving new finalized states to the freezer. @@ -1552,16 +1645,16 @@ pub fn migrate_database, Cold: ItemStore>( let mut cold_db_block_ops: Vec = vec![]; // 1. Copy all of the states between the head and the split slot, from the hot DB - // to the cold DB. - let state_root_iter = StateRootsIterator::new(&store, frozen_head); - for maybe_pair in state_root_iter.take_while(|result| match result { - Ok((_, slot)) => { + // to the cold DB. Delete the execution payloads of these now-finalized blocks. + let state_root_iter = RootsIterator::new(&store, frozen_head); + for maybe_tuple in state_root_iter.take_while(|result| match result { + Ok((_, _, slot)) => { slot >= ¤t_split_slot && anchor_slot.map_or(true, |anchor_slot| slot >= &anchor_slot) } Err(_) => true, }) { - let (state_root, slot) = maybe_pair?; + let (block_root, state_root, slot) = maybe_tuple?; let mut cold_db_ops: Vec = Vec::new(); @@ -1584,6 +1677,11 @@ pub fn migrate_database, Cold: ItemStore>( // Delete the old summary, and the full state if we lie on an epoch boundary. hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); + + // Delete the execution payload. Even if this execution payload is the payload of the + // new finalized block it is OK to delete it, as `try_get_full_block` looks at the split + // slot when determining whether to reconstruct payloads. + hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); } // Warning: Critical section. We have to take care not to put any of the two databases in an diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 50295df4b0e..20147adb9f1 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -59,6 +59,12 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { ) } +pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> { + App::new("prune_payloads") + .setting(clap::AppSettings::ColoredHelp) + .about("Prune finalized execution payloads") +} + pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new(CMD) .visible_aliases(&["db"]) @@ -85,6 +91,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .subcommand(migrate_cli_app()) .subcommand(version_cli_app()) .subcommand(inspect_cli_app()) + .subcommand(prune_payloads_app()) } fn parse_client_config( @@ -257,6 +264,30 @@ pub fn migrate_db( ) } +pub fn prune_payloads( + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log, + )?; + + // If we're trigging a prune manually then ignore the check on the split's parent that bails + // out early. + let force = true; + db.try_prune_execution_payloads(force) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Result<(), String> { let client_config = parse_client_config(cli_args, &env)?; @@ -273,6 +304,7 @@ pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Re let inspect_config = parse_inspect_config(cli_args)?; inspect_db(inspect_config, client_config, &context, log) } + ("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log), _ => { return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()) } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index b28c1a0c3e7..aed8ebf394c 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1227,6 +1227,19 @@ fn compact_db_flag() { .with_config(|config| assert!(config.store.compact_on_init)); } #[test] +fn prune_payloads_on_startup_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.store.prune_payloads_on_init)); +} +#[test] +fn prune_payloads_on_startup_false() { + CommandLineTest::new() + .flag("prune-payloads-on-startup", Some("false")) + .run_with_zero_port() + .with_config(|config| assert!(!config.store.prune_payloads_on_init)); +} +#[test] fn reconstruct_historic_states_flag() { CommandLineTest::new() .flag("reconstruct-historic-states", None) From f0544b4048dd2ac592b4c309832126ed47eed712 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 16 Sep 2022 18:00:40 +1000 Subject: [PATCH 3/4] Use new frozen blocks --- beacon_node/beacon_chain/src/historical_blocks.rs | 2 +- beacon_node/store/src/hot_cold_store.rs | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index f6c10958f28..7d7fc323e91 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -108,7 +108,7 @@ impl BeaconChain { // Store block in the hot database without payload. self.store - .blinded_block_as_cold_kv_store_ops(&block_root, block, &mut cold_batch); + .blinded_block_as_cold_kv_store_ops(&block_root, block, &mut cold_batch)?; // Store block roots, including at all skip slots in the freezer DB. for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index b79b9214317..8bc308951d8 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1642,7 +1642,7 @@ pub fn migrate_database, Cold: ItemStore>( } let mut hot_db_ops: Vec> = Vec::new(); - let mut cold_db_block_ops: Vec = vec![]; + let mut cold_db_block_ops: Vec = vec![]; // 1. Copy all of the states between the head and the split slot, from the hot DB // to the cold DB. Delete the execution payloads of these now-finalized blocks. @@ -1682,6 +1682,16 @@ pub fn migrate_database, Cold: ItemStore>( // new finalized block it is OK to delete it, as `try_get_full_block` looks at the split // slot when determining whether to reconstruct payloads. hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); + + // Copy the blinded block from the hot database to the freezer. + let blinded_block = store + .get_hot_blinded_block(&block_root)? + .ok_or(Error::BlockNotFound(block_root))?; + store.blinded_block_as_cold_kv_store_ops( + &block_root, + &blinded_block, + &mut cold_db_block_ops, + )?; } // Warning: Critical section. We have to take care not to put any of the two databases in an @@ -1695,6 +1705,7 @@ pub fn migrate_database, Cold: ItemStore>( // exceedingly rare event, this should be an acceptable tradeoff. // Flush to disk all the states that have just been migrated to the cold store. + store.cold_db.do_atomically(cold_db_block_ops)?; store.cold_db.sync()?; { From 854be82bb33b2a5d7dc652f45c18b2c4e8bad92a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 19 Sep 2022 13:38:01 +1000 Subject: [PATCH 4/4] Fix genesis block handling --- beacon_node/beacon_chain/src/builder.rs | 5 +++-- beacon_node/store/src/hot_cold_store.rs | 12 +++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2c5a68388ef..f83cf471b06 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -318,6 +318,7 @@ where .ok_or("set_genesis_state requires a store")?; let beacon_block = genesis_block(&mut beacon_state, &self.spec)?; + let blinded_block = beacon_block.clone_as_blinded(); beacon_state .build_all_caches(&self.spec) @@ -330,12 +331,12 @@ where .put_state(&beacon_state_root, &beacon_state) .map_err(|e| format!("Failed to store genesis state: {:?}", e))?; store - .put_block(&beacon_block_root, beacon_block.clone()) + .put_cold_blinded_block(&beacon_block_root, &blinded_block) .map_err(|e| format!("Failed to store genesis block: {:?}", e))?; // Store the genesis block under the `ZERO_HASH` key. store - .put_block(&Hash256::zero(), beacon_block.clone()) + .put_cold_blinded_block(&Hash256::zero(), &blinded_block) .map_err(|e| { format!( "Failed to store genesis block under 0x00..00 alias: {:?}", diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 8bc308951d8..b8706be6e0b 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -410,7 +410,7 @@ impl, Cold: ItemStore> HotColdDB slot: Option, ) -> Result>, Error> { if let Some(slot) = slot { - if slot < self.get_split_slot() { + if slot < self.get_split_slot() || slot == 0 { // To the freezer DB. self.get_cold_blinded_block_by_slot(slot) } else { @@ -469,6 +469,16 @@ impl, Cold: ItemStore> HotColdDB )?)) } + pub fn put_cold_blinded_block( + &self, + block_root: &Hash256, + block: &SignedBlindedBeaconBlock, + ) -> Result<(), Error> { + let mut ops = Vec::with_capacity(2); + self.blinded_block_as_cold_kv_store_ops(block_root, block, &mut ops)?; + self.cold_db.do_atomically(ops) + } + pub fn blinded_block_as_cold_kv_store_ops( &self, block_root: &Hash256,