diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 2954e6c69f9..b1de15cf3fa 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -636,10 +636,21 @@ impl BeaconChain { /// Load persisted ProofEngine state from disk, returning `None` if not found or corrupt. pub fn load_proof_engine_state(store: BeaconStore) -> Option { - match store.get_item::(&PROOF_ENGINE_DB_KEY) { - Ok(Some(persisted)) => { - tracing::info!("Loaded ProofEngine state from disk"); - Some(persisted) + match store + .hot_db + .get_bytes(DBColumn::ProofEngine, PROOF_ENGINE_DB_KEY.as_slice()) + { + Ok(Some(bytes)) => { + match PersistedProofEngineState::from_bytes(&bytes, store.get_config()) { + Ok(persisted) => { + tracing::info!("Loaded ProofEngine state from disk"); + Some(persisted) + } + Err(e) => { + tracing::warn!(error = ?e, "Failed to decode ProofEngine state from disk, starting fresh"); + None + } + } } Ok(None) => None, Err(e) => { diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 91ca50f06de..9f2f3d4c1b4 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -56,8 +56,7 @@ use state_processing::AllCaches; use std::sync::Arc; use std::time::Duration; use store::{ - Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, StoreItem, - iter::StateRootsIterator, + Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator, }; use task_executor::{JoinHandle, ShutdownReason}; use tracing::info_span; @@ -1062,7 +1061,7 @@ impl BeaconChain { let op = proof_engine .to_persisted() - .as_kv_store_op(PROOF_ENGINE_DB_KEY); + .as_kv_store_op(PROOF_ENGINE_DB_KEY, self.store.get_config())?; self.store.hot_db.do_atomically(vec![op])?; Ok(()) } diff --git a/beacon_node/execution_layer/src/eip8025/persisted_state.rs b/beacon_node/execution_layer/src/eip8025/persisted_state.rs index 9ca3373841b..8491fb076e3 100644 --- a/beacon_node/execution_layer/src/eip8025/persisted_state.rs +++ b/beacon_node/execution_layer/src/eip8025/persisted_state.rs @@ -8,7 +8,7 @@ use crate::ForkchoiceState; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::collections::{BTreeMap, HashMap, HashSet}; -use store::{DBColumn, Error as StoreError, StoreItem}; +use store::{DBColumn, Error as StoreError, KeyValueStoreOp, StoreConfig, StoreItem}; use types::{ExecutionBlockHash, Hash256, SignedExecutionProof}; /// Version field for future format migrations within the ProofEngine state. @@ -90,6 +90,35 @@ impl StoreItem for PersistedProofEngineState { } impl PersistedProofEngineState { + /// Decompress and decode from bytes using zstd (mirrors `PersistedForkChoiceV28::from_bytes`). + pub fn from_bytes(bytes: &[u8], store_config: &StoreConfig) -> Result { + let decompressed_bytes = store_config + .decompress_bytes(bytes) + .map_err(StoreError::Compression)?; + Self::from_ssz_bytes(&decompressed_bytes).map_err(Into::into) + } + + /// Encode and compress to bytes using zstd (mirrors `PersistedForkChoiceV28::as_bytes`). + pub fn as_bytes(&self, store_config: &StoreConfig) -> Result, StoreError> { + let ssz_bytes = self.as_ssz_bytes(); + store_config + .compress_bytes(&ssz_bytes) + .map_err(StoreError::Compression) + } + + /// Produce a compressed `KeyValueStoreOp` for atomic persistence. + pub fn as_kv_store_op( + &self, + key: Hash256, + store_config: &StoreConfig, + ) -> Result { + Ok(KeyValueStoreOp::PutKeyValue( + DBColumn::ProofEngine, + key.as_slice().to_vec(), + self.as_bytes(store_config)?, + )) + } + pub fn from_state(state: &State) -> Self { Self { version: PROOF_ENGINE_STATE_VERSION, @@ -411,4 +440,54 @@ mod tests { ); assert_eq!(decoded.buffer.requests, persisted.buffer.requests); } + + /// Verifies that compress → decompress round-trip via `as_bytes`/`from_bytes` + /// preserves all fields, and that compressed output is smaller than raw SSZ. + #[test] + fn test_compressed_round_trip() { + let fixture = TestStateFixtureBuilder::simple_chain() + .with_fork(1, 2, Some(0)) + .with_fork(1, 3, Some(3)) + .build(); + + let mut state = State::new(); + fixture.bootstrap_canonical(&mut state).unwrap(); + fixture.insert_fork(&mut state, 0, None).unwrap(); + + let head = fixture.canonical_block_hash(2); + let safe = fixture.canonical_block_hash(1); + let finalized = fixture.canonical_block_hash(0); + state + .forkchoice_updated(create_forkchoice_state(head, safe, finalized)) + .unwrap(); + + let persisted = PersistedProofEngineState::from_state(&state); + let store_config = StoreConfig::default(); + + // Compress. + let compressed = persisted.as_bytes(&store_config).unwrap(); + let raw_ssz = persisted.as_store_bytes(); + + // Compressed should differ from raw SSZ (zstd adds framing even if not smaller). + assert_ne!(compressed, raw_ssz); + + // Decompress and verify equality. + let decoded = PersistedProofEngineState::from_bytes(&compressed, &store_config).unwrap(); + assert_eq!(decoded.version, persisted.version); + assert_eq!(decoded.last_valid_fcs, persisted.last_valid_fcs); + assert_eq!(decoded.latest_fcs, persisted.latest_fcs); + assert_eq!( + decoded.tree.proofs_by_block_hash, + persisted.tree.proofs_by_block_hash + ); + assert_eq!( + decoded.tree.request_root_to_block_hash, + persisted.tree.request_root_to_block_hash + ); + assert_eq!( + decoded.tree.current_canonical_head, + persisted.tree.current_canonical_head + ); + assert_eq!(decoded.buffer.requests, persisted.buffer.requests); + } }