Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,10 +636,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Load persisted ProofEngine state from disk, returning `None` if not found or corrupt.
pub fn load_proof_engine_state(store: BeaconStore<T>) -> Option<PersistedProofEngineState> {
match store.get_item::<PersistedProofEngineState>(&PROOF_ENGINE_DB_KEY) {
Ok(Some(persisted)) => {
tracing::info!("Loaded ProofEngine state from disk");
Some(persisted)
match store
.hot_db
.get_bytes(DBColumn::ProofEngine, PROOF_ENGINE_DB_KEY.as_slice())
{
Ok(Some(bytes)) => {
match PersistedProofEngineState::from_bytes(&bytes, store.get_config()) {
Ok(persisted) => {
tracing::info!("Loaded ProofEngine state from disk");
Some(persisted)
}
Err(e) => {
tracing::warn!(error = ?e, "Failed to decode ProofEngine state from disk, starting fresh");
None
}
}
}
Ok(None) => None,
Err(e) => {
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ use state_processing::AllCaches;
use std::sync::Arc;
use std::time::Duration;
use store::{
Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, StoreItem,
iter::StateRootsIterator,
Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreConfig, iter::StateRootsIterator,
};
use task_executor::{JoinHandle, ShutdownReason};
use tracing::info_span;
Expand Down Expand Up @@ -1062,7 +1061,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let op = proof_engine
.to_persisted()
.as_kv_store_op(PROOF_ENGINE_DB_KEY);
.as_kv_store_op(PROOF_ENGINE_DB_KEY, self.store.get_config())?;
self.store.hot_db.do_atomically(vec![op])?;
Ok(())
}
Expand Down
81 changes: 80 additions & 1 deletion beacon_node/execution_layer/src/eip8025/persisted_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::ForkchoiceState;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::collections::{BTreeMap, HashMap, HashSet};
use store::{DBColumn, Error as StoreError, StoreItem};
use store::{DBColumn, Error as StoreError, KeyValueStoreOp, StoreConfig, StoreItem};
use types::{ExecutionBlockHash, Hash256, SignedExecutionProof};

/// Version field for future format migrations within the ProofEngine state.
Expand Down Expand Up @@ -90,6 +90,35 @@ impl StoreItem for PersistedProofEngineState {
}

impl PersistedProofEngineState {
/// Decompress and decode from bytes using zstd (mirrors `PersistedForkChoiceV28::from_bytes`).
pub fn from_bytes(bytes: &[u8], store_config: &StoreConfig) -> Result<Self, StoreError> {
let decompressed_bytes = store_config
.decompress_bytes(bytes)
.map_err(StoreError::Compression)?;
Self::from_ssz_bytes(&decompressed_bytes).map_err(Into::into)
}

/// Encode and compress to bytes using zstd (mirrors `PersistedForkChoiceV28::as_bytes`).
pub fn as_bytes(&self, store_config: &StoreConfig) -> Result<Vec<u8>, StoreError> {
let ssz_bytes = self.as_ssz_bytes();
store_config
.compress_bytes(&ssz_bytes)
.map_err(StoreError::Compression)
}

/// Produce a compressed `KeyValueStoreOp` for atomic persistence.
pub fn as_kv_store_op(
&self,
key: Hash256,
store_config: &StoreConfig,
) -> Result<KeyValueStoreOp, StoreError> {
Ok(KeyValueStoreOp::PutKeyValue(
DBColumn::ProofEngine,
key.as_slice().to_vec(),
self.as_bytes(store_config)?,
))
}

pub fn from_state(state: &State) -> Self {
Self {
version: PROOF_ENGINE_STATE_VERSION,
Expand Down Expand Up @@ -411,4 +440,54 @@ mod tests {
);
assert_eq!(decoded.buffer.requests, persisted.buffer.requests);
}

/// Verifies that compress → decompress round-trip via `as_bytes`/`from_bytes`
/// preserves all fields, and that compressed output is smaller than raw SSZ.
#[test]
fn test_compressed_round_trip() {
let fixture = TestStateFixtureBuilder::simple_chain()
.with_fork(1, 2, Some(0))
.with_fork(1, 3, Some(3))
.build();

let mut state = State::new();
fixture.bootstrap_canonical(&mut state).unwrap();
fixture.insert_fork(&mut state, 0, None).unwrap();

let head = fixture.canonical_block_hash(2);
let safe = fixture.canonical_block_hash(1);
let finalized = fixture.canonical_block_hash(0);
state
.forkchoice_updated(create_forkchoice_state(head, safe, finalized))
.unwrap();

let persisted = PersistedProofEngineState::from_state(&state);
let store_config = StoreConfig::default();

// Compress.
let compressed = persisted.as_bytes(&store_config).unwrap();
let raw_ssz = persisted.as_store_bytes();

// Compressed should differ from raw SSZ (zstd adds framing even if not smaller).
assert_ne!(compressed, raw_ssz);

// Decompress and verify equality.
let decoded = PersistedProofEngineState::from_bytes(&compressed, &store_config).unwrap();
assert_eq!(decoded.version, persisted.version);
assert_eq!(decoded.last_valid_fcs, persisted.last_valid_fcs);
assert_eq!(decoded.latest_fcs, persisted.latest_fcs);
assert_eq!(
decoded.tree.proofs_by_block_hash,
persisted.tree.proofs_by_block_hash
);
assert_eq!(
decoded.tree.request_root_to_block_hash,
persisted.tree.request_root_to_block_hash
);
assert_eq!(
decoded.tree.current_canonical_head,
persisted.tree.current_canonical_head
);
assert_eq!(decoded.buffer.requests, persisted.buffer.requests);
}
}
Loading