Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/consensus/parlia/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::{address, b256, Bytes};
use alloy_primitives::Bytes;

// Mock header for testing
struct MockHeader {
Expand Down
26 changes: 25 additions & 1 deletion src/consensus/parlia/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use reth_provider::BlockExecutionResult;
use reth_primitives_traits::{Block, SealedBlock, SealedHeader, RecoveredBlock};
use reth_chainspec::EthChainSpec;
use std::sync::Arc;
use std::collections::HashMap;

/// Enhanced Parlia consensus that implements proper pre/post execution validation
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -277,7 +278,7 @@ where
tracing::debug!("Epoch boundary at block {}", header.number);
}

tracing::debug!("✅ [BSC] Full post-execution validation completed for block {}", header.number);
tracing::debug!("Succeed to finish full post-execution validation for block {}", header.number);
Ok(())
}

Expand Down Expand Up @@ -487,6 +488,29 @@ where
}
}

impl<ChainSpec, P> super::ParliaConsensusObject for ParliaConsensus<ChainSpec, P>
where
ChainSpec: EthChainSpec + BscHardforks + 'static,
P: SnapshotProvider + std::fmt::Debug + 'static,
{
fn verify_cascading_fields(
&self,
header: &Header,
parent: &Header,
_ancestor: Option<&HashMap<alloy_primitives::B256, SealedHeader<Header>>>,
snap: &Snapshot,
) -> Result<(), reth_evm::execute::BlockExecutionError> {
let header_hash = alloy_primitives::keccak256(alloy_rlp::encode(header));
let parent_hash = alloy_primitives::keccak256(alloy_rlp::encode(parent));
let sealed_header = SealedHeader::new(header.clone(), header_hash);
let sealed_parent = SealedHeader::new(parent.clone(), parent_hash);

self.consensus_validator
.verify_cascading_fields(&sealed_header, &sealed_parent, None, snap)
.map_err(|e| reth_evm::execute::BlockExecutionError::msg(format!("{}", e)))
}
}

impl<ChainSpec, P> HeaderValidator<Header> for ParliaConsensus<ChainSpec, P>
where
ChainSpec: EthChainSpec + BscHardforks + 'static,
Expand Down
20 changes: 18 additions & 2 deletions src/consensus/parlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,31 @@ pub mod util;

pub use snapshot::{Snapshot, ValidatorInfo, CHECKPOINT_INTERVAL};
pub use vote::{VoteAddress, VoteAttestation, VoteData, VoteEnvelope, VoteSignature, ValidatorsBitSet};
pub use provider::InMemorySnapshotProvider;
pub use constants::*;
pub use attestation::parse_vote_attestation_from_header;
pub use validator::{ParliaHeaderValidator, SnapshotProvider};
pub use validator::ParliaHeaderValidator;
pub use validation::BscConsensusValidator;
pub use hertz_patch::{HertzPatchManager, StoragePatch};
pub use transaction_splitter::{TransactionSplitter, SplitTransactions, TransactionSplitterError};
pub use consensus::ParliaConsensus;
pub use util::hash_with_chain_id;
pub use provider::SnapshotProvider;

// A single object-safe trait to represent the Parlia consensus object when held globally.
// This combines the execution-facing validator API with the consensus engine trait.
pub trait ParliaConsensusObject:
reth::consensus::FullConsensus<crate::BscPrimitives, Error = reth::consensus::ConsensusError>
{
fn verify_cascading_fields(
&self,
header: &alloy_consensus::Header,
parent: &alloy_consensus::Header,
ancestor: Option<&std::collections::HashMap<alloy_primitives::B256, reth_primitives_traits::SealedHeader>>,
snap: &Snapshot,
) -> Result<(), reth_evm::execute::BlockExecutionError>;
}

// Note: concrete implementation is provided for `ParliaConsensus` in `consensus.rs`

/// Epoch length (200 blocks on BSC main-net).
pub const EPOCH: u64 = 200;
Expand Down
126 changes: 31 additions & 95 deletions src/consensus/parlia/provider.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use super::snapshot::Snapshot;
use super::validator::SnapshotProvider;
use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::sync::Arc;
use reth_provider::{HeaderProvider, BlockReader};
use crate::chainspec::BscChainSpec;
Expand All @@ -16,77 +14,6 @@ pub trait OnDemandSnapshotCreator {
fn create_snapshot_on_demand(&self, target_block_number: u64) -> Option<Snapshot>;
}

/// Very simple `SnapshotProvider` that keeps the most recent `max_entries` snapshots in memory.
/// Keys are the **block number** the snapshot is valid for (i.e. the last block of the snapshot’s
/// epoch). For historical sync this is sufficient – we can switch to an MDBX-backed provider later.
#[derive(Clone, Debug)]
pub struct InMemorySnapshotProvider {
inner: Arc<RwLock<BTreeMap<u64, Snapshot>>>,
max_entries: usize,
}

impl InMemorySnapshotProvider {
/// Create a new provider keeping at most `max_entries` snapshots.
pub fn new(max_entries: usize) -> Self {
Self { inner: Arc::new(RwLock::new(BTreeMap::new())), max_entries }
}
}

impl Default for InMemorySnapshotProvider {
fn default() -> Self { Self::new(2048) }
}

impl SnapshotProvider for InMemorySnapshotProvider {
fn snapshot(&self, block_number: u64) -> Option<Snapshot> {
let guard = self.inner.read();
// InMemorySnapshotProvider::snapshot called

// Find the greatest key <= block_number.
if let Some((found_block, snap)) = guard.range(..=block_number).next_back() {
tracing::info!("✅ [BSC-PROVIDER] Found snapshot for block {} (requested {}): validators={}, epoch_num={}",
found_block, block_number, snap.validators.len(), snap.epoch_num);
return Some(snap.clone());
}

tracing::warn!("⚠️ [BSC-PROVIDER] No snapshot found for block {}", block_number);
None
}

fn insert(&self, snapshot: Snapshot) {
let mut guard = self.inner.write();
guard.insert(snapshot.block_number, snapshot.clone());

// clamp size
while guard.len() > self.max_entries {
// remove the smallest key
if let Some(first_key) = guard.keys().next().cloned() {
// Removing old snapshot (cache full)
guard.remove(&first_key);
}
}
// Cache updated
}

fn get_checkpoint_header(&self, _block_number: u64) -> Option<alloy_consensus::Header> {
// InMemorySnapshotProvider doesn't have access to headers
None
}
}

impl SnapshotProvider for Arc<InMemorySnapshotProvider> {
fn snapshot(&self, block_number: u64) -> Option<Snapshot> {
(**self).snapshot(block_number)
}

fn insert(&self, snapshot: Snapshot) {
(**self).insert(snapshot)
}

fn get_checkpoint_header(&self, block_number: u64) -> Option<alloy_consensus::Header> {
(**self).get_checkpoint_header(block_number)
}
}

// ---------------------------------------------------------------------------
// MDBX‐backed snapshot provider with LRU front‐cache
// ---------------------------------------------------------------------------
Expand All @@ -98,6 +25,17 @@ use reth_db::transaction::{DbTx, DbTxMut};
use reth_db::cursor::DbCursorRO;
use schnellru::{ByLength, LruMap};

pub trait SnapshotProvider: Send + Sync {
/// Returns the snapshot that is valid for the given `block_number` (usually parent block).
fn snapshot(&self, block_number: u64) -> Option<Snapshot>;

/// Inserts (or replaces) the snapshot in the provider.
fn insert(&self, snapshot: Snapshot);

/// Fetches header by block number for checkpoint parsing (like reth-bsc-trail's get_header_by_hash)
fn get_checkpoint_header(&self, block_number: u64) -> Option<alloy_consensus::Header>;
}

/// `DbSnapshotProvider` wraps an MDBX database; it keeps a small in-memory LRU to avoid hitting
/// storage for hot epochs. The DB layer persists snapshots as CBOR blobs via the `ParliaSnapshots`
/// table that is already defined in `db.rs`.
Expand Down Expand Up @@ -172,12 +110,12 @@ impl<DB: Database> DbSnapshotProvider<DB> {
if let Ok(Some(raw_blob)) = tx.get::<crate::consensus::parlia::db::ParliaSnapshots>(block_number) {
let raw = &raw_blob.0;
if let Ok(decoded) = Snapshot::decompress(raw) {
tracing::debug!("✅ [BSC] Found exact snapshot for block {} in DB (snapshot_block={})", block_number, decoded.block_number);
tracing::debug!("Succeed to find snapshot for block {} from DB (snapshot_block={})", block_number, decoded.block_number);
return Some(decoded);
}
}

tracing::debug!("🔍 [BSC] No exact snapshot for block {}, searching for fallback...", block_number);
tracing::debug!("Failed to find snapshot for block {}, searching for fallback...", block_number);

// If exact snapshot not found, look for the most recent snapshot before this block
let mut cursor = tx
Expand All @@ -191,25 +129,24 @@ impl<DB: Database> DbSnapshotProvider<DB> {
let raw = &raw_blob.0;
if let Ok(decoded) = Snapshot::decompress(raw) {
found_count += 1;
tracing::debug!("🔍 [BSC] Found snapshot in DB: block {} -> snapshot_block {}", db_block_num, decoded.block_number);
tracing::trace!("Scan snapshot in DB, block {} -> snapshot_block {}", db_block_num, decoded.block_number);
last = Some(decoded);
}
}

if let Some(ref snap) = last {
tracing::debug!("✅ [BSC] Selected fallback snapshot for block {} at block {} in DB (searched {} snapshots)", block_number, snap.block_number, found_count);
tracing::debug!("Succeed to find fallback snapshot for block {} at block {} in DB (searched {} snapshots)", block_number, snap.block_number, found_count);
} else {
tracing::debug!("❌ [BSC] No fallback snapshot found for block {} in DB", block_number);
tracing::debug!("Failed to find snapshot for block {} from DB", block_number);
}
last
}

fn persist_to_db(&self, snap: &Snapshot) -> Result<(), DatabaseError> {
tracing::debug!("💾 [BSC] Starting DB persist for snapshot block {}", snap.block_number);
let tx = self.db.tx_mut()?;
tx.put::<crate::consensus::parlia::db::ParliaSnapshots>(snap.block_number, ParliaSnapshotBlob(snap.clone().compress()))?;
tx.commit()?;
tracing::debug!("✅ [BSC] Successfully committed snapshot block {} to DB", snap.block_number);
tracing::debug!("Succeed to insert snapshot block {} to DB", snap.block_number);
Ok(())
}
}
Expand Down Expand Up @@ -237,18 +174,19 @@ impl<DB: Database + 'static> SnapshotProvider for DbSnapshotProvider<DB> {
if snapshot.block_number % crate::consensus::parlia::snapshot::CHECKPOINT_INTERVAL == 0 {
match self.persist_to_db(&snapshot) {
Ok(()) => {
tracing::debug!("✅ [BSC] Successfully persisted snapshot for block {} to DB", snapshot.block_number);
tracing::debug!("Succeed to persist snapshot for block {} to DB", snapshot.block_number);
},
Err(e) => {
tracing::error!("❌ [BSC] Failed to persist snapshot for block {} to DB: {}", snapshot.block_number, e);
tracing::error!("Failed to persist snapshot for block {} to DB due to {:?}", snapshot.block_number, e);
}
}
}
}

fn get_checkpoint_header(&self, _block_number: u64) -> Option<alloy_consensus::Header> {
tracing::info!("Get checkpoint header for block {} in DbSnapshotProvider", _block_number);
// DbSnapshotProvider doesn't have access to headers
None
unimplemented!("DbSnapshotProvider doesn't have access to headers");
}
}

Expand All @@ -262,7 +200,7 @@ where
{
let mut cache_guard = self.base.cache.write();
if let Some(cached_snap) = cache_guard.get(&block_number) {
tracing::debug!("✅ [BSC] Cache hit for snapshot request {} -> found snapshot for block {}", block_number, cached_snap.block_number);
tracing::debug!("Succeed to query snapshot from cache, request {} -> found snapshot for block {}", block_number, cached_snap.block_number);
return Some(cached_snap.clone());
}
}
Expand Down Expand Up @@ -357,10 +295,7 @@ where
let parsed = super::validator::parse_epoch_update(checkpoint_header,
self.chain_spec.is_luban_active_at_block(checkpoint_header.number),
self.chain_spec.is_bohr_active_at_timestamp(checkpoint_header.timestamp)
);

// Validator set parsed from checkpoint header

);
parsed
} else {
tracing::warn!("⚠️ [BSC] Checkpoint header for block {} not found in headers_to_apply list", checkpoint_block_number);
Expand Down Expand Up @@ -391,18 +326,15 @@ where
Some(snap) => snap,
None => {
if header.number % 100000 == 0 { // only log every 100k blocks to reduce spam
tracing::debug!("🔄 [BSC] Failed to apply header {} to snapshot during Bodies stage", header.number);
tracing::debug!("Failed to apply header {} to snapshot during Bodies stage", header.number);
}
return None;
}
};

// Cache intermediate snapshots (like reth-bsc-trail)
self.base.cache.write().insert(working_snapshot.block_number, working_snapshot.clone());

// Persist checkpoint snapshots to database (like reth-bsc-trail)
if working_snapshot.block_number % crate::consensus::parlia::snapshot::CHECKPOINT_INTERVAL == 0 {
// Persisting checkpoint snapshot
tracing::debug!("Succeed to rebuild snapshot for block {} to DB", working_snapshot.block_number);
self.base.insert(working_snapshot.clone());
}
}
Expand All @@ -416,12 +348,16 @@ where
}

fn get_checkpoint_header(&self, block_number: u64) -> Option<alloy_consensus::Header> {
tracing::info!("Get checkpoint header for block {} in enhanced snapshot provider", block_number);
// Use the provider to fetch header from database (like reth-bsc-trail's get_header_by_hash)
use reth_provider::HeaderProvider;
match self.header_provider.header_by_number(block_number) {
Ok(header) => header,
Ok(header) => {
tracing::info!("Succeed to fetch header{} for block {} in enhanced snapshot provider", header.is_none(),block_number);
header
},
Err(e) => {
tracing::error!("❌ [BSC] Failed to fetch header for block {}: {:?}", block_number, e);
tracing::error!("Failed to fetch header for block {}: {:?}", block_number, e);
None
}
}
Expand Down
12 changes: 7 additions & 5 deletions src/consensus/parlia/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::{BTreeMap, HashMap};

//use crate::consensus::parlia::TURN_LENGTH_SIZE;

use super::vote::{VoteAddress, VoteAttestation, VoteData};
use alloy_primitives::{Address, BlockNumber, B256};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -59,7 +61,7 @@ pub struct Snapshot {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub turn_length: Option<u8>,

/// Expected block interval in seconds.
/// Expected block interval in milliseconds.
pub block_interval: u64,
}

Expand Down Expand Up @@ -281,14 +283,14 @@ impl Snapshot {

/// Validator that should propose the **next** block.
pub fn inturn_validator(&self) -> Address {
let turn = u64::from(self.turn_length.unwrap_or(DEFAULT_TURN_LENGTH));
let turn_length = u64::from(self.turn_length.unwrap_or(DEFAULT_TURN_LENGTH));
let next_block = self.block_number + 1;
let offset = (next_block / turn) as usize % self.validators.len();
let offset = (next_block / turn_length) as usize % self.validators.len();
let next_validator = self.validators[offset];

tracing::debug!(
"🔢 [BSC] inturn_validator calculation: snapshot_block={}, next_block={}, turn={}, offset={}, validators_len={}, next_validator=0x{:x}",
self.block_number, next_block, turn, offset, self.validators.len(), next_validator
"inturn_validator debug info, snapshot_block={}, next_block={}, turn_length={}, offset={}, validators_len={}, next_validator=0x{:x}",
self.block_number, next_block, turn_length, offset, self.validators.len(), next_validator
);

next_validator
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/parlia/tests/snapshot_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::super::{
provider::DbSnapshotProvider,
snapshot::Snapshot,
validator::SnapshotProvider,
provider::SnapshotProvider,
};
use alloy_primitives::{Address, B256};
use reth_db::{init_db, mdbx::DatabaseArguments, Database, transaction::DbTx, cursor::DbCursorRO};
Expand Down
20 changes: 20 additions & 0 deletions src/consensus/parlia/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,24 @@ fn rlp_header(header: &Header, chain_id: u64) -> alloy_rlp::Header {
}
}
rlp_head
}


pub fn calculate_millisecond_timestamp(header: &Header) -> u64 {
let seconds = header.timestamp;
let mix_digest = header.mix_hash;

let ms_part = if mix_digest != B256::ZERO {
let bytes = mix_digest.as_slice();
// Convert last 8 bytes to u64 (big-endian), equivalent to Go's uint256.SetBytes32().Uint64()
let mut result = 0u64;
for &byte in bytes.iter().skip(24).take(8) {
result = (result << 8) | u64::from(byte);
}
result
} else {
0
};

seconds * 1000 + ms_part
}
Loading