diff --git a/src/consensus/parlia/attestation.rs b/src/consensus/parlia/attestation.rs index 70e67cd..8b59ce9 100644 --- a/src/consensus/parlia/attestation.rs +++ b/src/consensus/parlia/attestation.rs @@ -64,7 +64,7 @@ where #[cfg(test)] mod tests { use super::*; - use alloy_primitives::{address, b256, Bytes}; + use alloy_primitives::Bytes; // Mock header for testing struct MockHeader { diff --git a/src/consensus/parlia/consensus.rs b/src/consensus/parlia/consensus.rs index f6463f4..6d2de34 100644 --- a/src/consensus/parlia/consensus.rs +++ b/src/consensus/parlia/consensus.rs @@ -14,6 +14,7 @@ use reth_provider::BlockExecutionResult; use reth_primitives_traits::{Block, SealedBlock, SealedHeader, RecoveredBlock}; use reth_chainspec::EthChainSpec; use std::sync::Arc; +use std::collections::HashMap; /// Enhanced Parlia consensus that implements proper pre/post execution validation #[derive(Debug, Clone)] @@ -277,7 +278,7 @@ where tracing::debug!("Epoch boundary at block {}", header.number); } - tracing::debug!("✅ [BSC] Full post-execution validation completed for block {}", header.number); + tracing::debug!("Succeed to finish full post-execution validation for block {}", header.number); Ok(()) } @@ -487,6 +488,29 @@ where } } +impl super::ParliaConsensusObject for ParliaConsensus +where + ChainSpec: EthChainSpec + BscHardforks + 'static, + P: SnapshotProvider + std::fmt::Debug + 'static, +{ + fn verify_cascading_fields( + &self, + header: &Header, + parent: &Header, + _ancestor: Option<&HashMap>>, + snap: &Snapshot, + ) -> Result<(), reth_evm::execute::BlockExecutionError> { + let header_hash = alloy_primitives::keccak256(alloy_rlp::encode(header)); + let parent_hash = alloy_primitives::keccak256(alloy_rlp::encode(parent)); + let sealed_header = SealedHeader::new(header.clone(), header_hash); + let sealed_parent = SealedHeader::new(parent.clone(), parent_hash); + + self.consensus_validator + .verify_cascading_fields(&sealed_header, &sealed_parent, None, snap) + .map_err(|e| reth_evm::execute::BlockExecutionError::msg(format!("{}", e))) + } +} + impl HeaderValidator
for ParliaConsensus where ChainSpec: EthChainSpec + BscHardforks + 'static, diff --git a/src/consensus/parlia/mod.rs b/src/consensus/parlia/mod.rs index 7e34b0a..d3deb93 100644 --- a/src/consensus/parlia/mod.rs +++ b/src/consensus/parlia/mod.rs @@ -22,15 +22,31 @@ pub mod util; pub use snapshot::{Snapshot, ValidatorInfo, CHECKPOINT_INTERVAL}; pub use vote::{VoteAddress, VoteAttestation, VoteData, VoteEnvelope, VoteSignature, ValidatorsBitSet}; -pub use provider::InMemorySnapshotProvider; pub use constants::*; pub use attestation::parse_vote_attestation_from_header; -pub use validator::{ParliaHeaderValidator, SnapshotProvider}; +pub use validator::ParliaHeaderValidator; pub use validation::BscConsensusValidator; pub use hertz_patch::{HertzPatchManager, StoragePatch}; pub use transaction_splitter::{TransactionSplitter, SplitTransactions, TransactionSplitterError}; pub use consensus::ParliaConsensus; pub use util::hash_with_chain_id; +pub use provider::SnapshotProvider; + +// A single object-safe trait to represent the Parlia consensus object when held globally. +// This combines the execution-facing validator API with the consensus engine trait. +pub trait ParliaConsensusObject: + reth::consensus::FullConsensus +{ + fn verify_cascading_fields( + &self, + header: &alloy_consensus::Header, + parent: &alloy_consensus::Header, + ancestor: Option<&std::collections::HashMap>, + snap: &Snapshot, + ) -> Result<(), reth_evm::execute::BlockExecutionError>; +} + +// Note: concrete implementation is provided for `ParliaConsensus` in `consensus.rs` /// Epoch length (200 blocks on BSC main-net). pub const EPOCH: u64 = 200; diff --git a/src/consensus/parlia/provider.rs b/src/consensus/parlia/provider.rs index 270bcca..fa3d3d8 100644 --- a/src/consensus/parlia/provider.rs +++ b/src/consensus/parlia/provider.rs @@ -1,7 +1,5 @@ use super::snapshot::Snapshot; -use super::validator::SnapshotProvider; use parking_lot::RwLock; -use std::collections::BTreeMap; use std::sync::Arc; use reth_provider::{HeaderProvider, BlockReader}; use crate::chainspec::BscChainSpec; @@ -16,77 +14,6 @@ pub trait OnDemandSnapshotCreator { fn create_snapshot_on_demand(&self, target_block_number: u64) -> Option; } -/// Very simple `SnapshotProvider` that keeps the most recent `max_entries` snapshots in memory. -/// Keys are the **block number** the snapshot is valid for (i.e. the last block of the snapshot’s -/// epoch). For historical sync this is sufficient – we can switch to an MDBX-backed provider later. -#[derive(Clone, Debug)] -pub struct InMemorySnapshotProvider { - inner: Arc>>, - max_entries: usize, -} - -impl InMemorySnapshotProvider { - /// Create a new provider keeping at most `max_entries` snapshots. - pub fn new(max_entries: usize) -> Self { - Self { inner: Arc::new(RwLock::new(BTreeMap::new())), max_entries } - } -} - -impl Default for InMemorySnapshotProvider { - fn default() -> Self { Self::new(2048) } -} - -impl SnapshotProvider for InMemorySnapshotProvider { - fn snapshot(&self, block_number: u64) -> Option { - let guard = self.inner.read(); - // InMemorySnapshotProvider::snapshot called - - // Find the greatest key <= block_number. - if let Some((found_block, snap)) = guard.range(..=block_number).next_back() { - tracing::info!("✅ [BSC-PROVIDER] Found snapshot for block {} (requested {}): validators={}, epoch_num={}", - found_block, block_number, snap.validators.len(), snap.epoch_num); - return Some(snap.clone()); - } - - tracing::warn!("⚠️ [BSC-PROVIDER] No snapshot found for block {}", block_number); - None - } - - fn insert(&self, snapshot: Snapshot) { - let mut guard = self.inner.write(); - guard.insert(snapshot.block_number, snapshot.clone()); - - // clamp size - while guard.len() > self.max_entries { - // remove the smallest key - if let Some(first_key) = guard.keys().next().cloned() { - // Removing old snapshot (cache full) - guard.remove(&first_key); - } - } - // Cache updated - } - - fn get_checkpoint_header(&self, _block_number: u64) -> Option { - // InMemorySnapshotProvider doesn't have access to headers - None - } -} - -impl SnapshotProvider for Arc { - fn snapshot(&self, block_number: u64) -> Option { - (**self).snapshot(block_number) - } - - fn insert(&self, snapshot: Snapshot) { - (**self).insert(snapshot) - } - - fn get_checkpoint_header(&self, block_number: u64) -> Option { - (**self).get_checkpoint_header(block_number) - } -} - // --------------------------------------------------------------------------- // MDBX‐backed snapshot provider with LRU front‐cache // --------------------------------------------------------------------------- @@ -98,6 +25,17 @@ use reth_db::transaction::{DbTx, DbTxMut}; use reth_db::cursor::DbCursorRO; use schnellru::{ByLength, LruMap}; +pub trait SnapshotProvider: Send + Sync { + /// Returns the snapshot that is valid for the given `block_number` (usually parent block). + fn snapshot(&self, block_number: u64) -> Option; + + /// Inserts (or replaces) the snapshot in the provider. + fn insert(&self, snapshot: Snapshot); + + /// Fetches header by block number for checkpoint parsing (like reth-bsc-trail's get_header_by_hash) + fn get_checkpoint_header(&self, block_number: u64) -> Option; +} + /// `DbSnapshotProvider` wraps an MDBX database; it keeps a small in-memory LRU to avoid hitting /// storage for hot epochs. The DB layer persists snapshots as CBOR blobs via the `ParliaSnapshots` /// table that is already defined in `db.rs`. @@ -172,12 +110,12 @@ impl DbSnapshotProvider { if let Ok(Some(raw_blob)) = tx.get::(block_number) { let raw = &raw_blob.0; if let Ok(decoded) = Snapshot::decompress(raw) { - tracing::debug!("✅ [BSC] Found exact snapshot for block {} in DB (snapshot_block={})", block_number, decoded.block_number); + tracing::debug!("Succeed to find snapshot for block {} from DB (snapshot_block={})", block_number, decoded.block_number); return Some(decoded); } } - tracing::debug!("🔍 [BSC] No exact snapshot for block {}, searching for fallback...", block_number); + tracing::debug!("Failed to find snapshot for block {}, searching for fallback...", block_number); // If exact snapshot not found, look for the most recent snapshot before this block let mut cursor = tx @@ -191,25 +129,24 @@ impl DbSnapshotProvider { let raw = &raw_blob.0; if let Ok(decoded) = Snapshot::decompress(raw) { found_count += 1; - tracing::debug!("🔍 [BSC] Found snapshot in DB: block {} -> snapshot_block {}", db_block_num, decoded.block_number); + tracing::trace!("Scan snapshot in DB, block {} -> snapshot_block {}", db_block_num, decoded.block_number); last = Some(decoded); } } if let Some(ref snap) = last { - tracing::debug!("✅ [BSC] Selected fallback snapshot for block {} at block {} in DB (searched {} snapshots)", block_number, snap.block_number, found_count); + tracing::debug!("Succeed to find fallback snapshot for block {} at block {} in DB (searched {} snapshots)", block_number, snap.block_number, found_count); } else { - tracing::debug!("❌ [BSC] No fallback snapshot found for block {} in DB", block_number); + tracing::debug!("Failed to find snapshot for block {} from DB", block_number); } last } fn persist_to_db(&self, snap: &Snapshot) -> Result<(), DatabaseError> { - tracing::debug!("💾 [BSC] Starting DB persist for snapshot block {}", snap.block_number); let tx = self.db.tx_mut()?; tx.put::(snap.block_number, ParliaSnapshotBlob(snap.clone().compress()))?; tx.commit()?; - tracing::debug!("✅ [BSC] Successfully committed snapshot block {} to DB", snap.block_number); + tracing::debug!("Succeed to insert snapshot block {} to DB", snap.block_number); Ok(()) } } @@ -237,18 +174,19 @@ impl SnapshotProvider for DbSnapshotProvider { if snapshot.block_number % crate::consensus::parlia::snapshot::CHECKPOINT_INTERVAL == 0 { match self.persist_to_db(&snapshot) { Ok(()) => { - tracing::debug!("✅ [BSC] Successfully persisted snapshot for block {} to DB", snapshot.block_number); + tracing::debug!("Succeed to persist snapshot for block {} to DB", snapshot.block_number); }, Err(e) => { - tracing::error!("❌ [BSC] Failed to persist snapshot for block {} to DB: {}", snapshot.block_number, e); + tracing::error!("Failed to persist snapshot for block {} to DB due to {:?}", snapshot.block_number, e); } } } } fn get_checkpoint_header(&self, _block_number: u64) -> Option { + tracing::info!("Get checkpoint header for block {} in DbSnapshotProvider", _block_number); // DbSnapshotProvider doesn't have access to headers - None + unimplemented!("DbSnapshotProvider doesn't have access to headers"); } } @@ -262,7 +200,7 @@ where { let mut cache_guard = self.base.cache.write(); if let Some(cached_snap) = cache_guard.get(&block_number) { - tracing::debug!("✅ [BSC] Cache hit for snapshot request {} -> found snapshot for block {}", block_number, cached_snap.block_number); + tracing::debug!("Succeed to query snapshot from cache, request {} -> found snapshot for block {}", block_number, cached_snap.block_number); return Some(cached_snap.clone()); } } @@ -357,10 +295,7 @@ where let parsed = super::validator::parse_epoch_update(checkpoint_header, self.chain_spec.is_luban_active_at_block(checkpoint_header.number), self.chain_spec.is_bohr_active_at_timestamp(checkpoint_header.timestamp) - ); - - // Validator set parsed from checkpoint header - + ); parsed } else { tracing::warn!("⚠️ [BSC] Checkpoint header for block {} not found in headers_to_apply list", checkpoint_block_number); @@ -391,18 +326,15 @@ where Some(snap) => snap, None => { if header.number % 100000 == 0 { // only log every 100k blocks to reduce spam - tracing::debug!("🔄 [BSC] Failed to apply header {} to snapshot during Bodies stage", header.number); + tracing::debug!("Failed to apply header {} to snapshot during Bodies stage", header.number); } return None; } }; - // Cache intermediate snapshots (like reth-bsc-trail) self.base.cache.write().insert(working_snapshot.block_number, working_snapshot.clone()); - - // Persist checkpoint snapshots to database (like reth-bsc-trail) if working_snapshot.block_number % crate::consensus::parlia::snapshot::CHECKPOINT_INTERVAL == 0 { - // Persisting checkpoint snapshot + tracing::debug!("Succeed to rebuild snapshot for block {} to DB", working_snapshot.block_number); self.base.insert(working_snapshot.clone()); } } @@ -416,12 +348,16 @@ where } fn get_checkpoint_header(&self, block_number: u64) -> Option { + tracing::info!("Get checkpoint header for block {} in enhanced snapshot provider", block_number); // Use the provider to fetch header from database (like reth-bsc-trail's get_header_by_hash) use reth_provider::HeaderProvider; match self.header_provider.header_by_number(block_number) { - Ok(header) => header, + Ok(header) => { + tracing::info!("Succeed to fetch header{} for block {} in enhanced snapshot provider", header.is_none(),block_number); + header + }, Err(e) => { - tracing::error!("❌ [BSC] Failed to fetch header for block {}: {:?}", block_number, e); + tracing::error!("Failed to fetch header for block {}: {:?}", block_number, e); None } } diff --git a/src/consensus/parlia/snapshot.rs b/src/consensus/parlia/snapshot.rs index 7d2426b..d1bc1f8 100644 --- a/src/consensus/parlia/snapshot.rs +++ b/src/consensus/parlia/snapshot.rs @@ -1,5 +1,7 @@ use std::collections::{BTreeMap, HashMap}; +//use crate::consensus::parlia::TURN_LENGTH_SIZE; + use super::vote::{VoteAddress, VoteAttestation, VoteData}; use alloy_primitives::{Address, BlockNumber, B256}; use serde::{Deserialize, Serialize}; @@ -59,7 +61,7 @@ pub struct Snapshot { #[serde(default, skip_serializing_if = "Option::is_none")] pub turn_length: Option, - /// Expected block interval in seconds. + /// Expected block interval in milliseconds. pub block_interval: u64, } @@ -281,14 +283,14 @@ impl Snapshot { /// Validator that should propose the **next** block. pub fn inturn_validator(&self) -> Address { - let turn = u64::from(self.turn_length.unwrap_or(DEFAULT_TURN_LENGTH)); + let turn_length = u64::from(self.turn_length.unwrap_or(DEFAULT_TURN_LENGTH)); let next_block = self.block_number + 1; - let offset = (next_block / turn) as usize % self.validators.len(); + let offset = (next_block / turn_length) as usize % self.validators.len(); let next_validator = self.validators[offset]; tracing::debug!( - "🔢 [BSC] inturn_validator calculation: snapshot_block={}, next_block={}, turn={}, offset={}, validators_len={}, next_validator=0x{:x}", - self.block_number, next_block, turn, offset, self.validators.len(), next_validator + "inturn_validator debug info, snapshot_block={}, next_block={}, turn_length={}, offset={}, validators_len={}, next_validator=0x{:x}", + self.block_number, next_block, turn_length, offset, self.validators.len(), next_validator ); next_validator diff --git a/src/consensus/parlia/tests/snapshot_persistence.rs b/src/consensus/parlia/tests/snapshot_persistence.rs index 1768b27..247c406 100644 --- a/src/consensus/parlia/tests/snapshot_persistence.rs +++ b/src/consensus/parlia/tests/snapshot_persistence.rs @@ -3,7 +3,7 @@ use super::super::{ provider::DbSnapshotProvider, snapshot::Snapshot, - validator::SnapshotProvider, + provider::SnapshotProvider, }; use alloy_primitives::{Address, B256}; use reth_db::{init_db, mdbx::DatabaseArguments, Database, transaction::DbTx, cursor::DbCursorRO}; diff --git a/src/consensus/parlia/util.rs b/src/consensus/parlia/util.rs index 0e3667c..04afbed 100644 --- a/src/consensus/parlia/util.rs +++ b/src/consensus/parlia/util.rs @@ -82,4 +82,24 @@ fn rlp_header(header: &Header, chain_id: u64) -> alloy_rlp::Header { } } rlp_head +} + + +pub fn calculate_millisecond_timestamp(header: &Header) -> u64 { + let seconds = header.timestamp; + let mix_digest = header.mix_hash; + + let ms_part = if mix_digest != B256::ZERO { + let bytes = mix_digest.as_slice(); + // Convert last 8 bytes to u64 (big-endian), equivalent to Go's uint256.SetBytes32().Uint64() + let mut result = 0u64; + for &byte in bytes.iter().skip(24).take(8) { + result = (result << 8) | u64::from(byte); + } + result + } else { + 0 + }; + + seconds * 1000 + ms_part } \ No newline at end of file diff --git a/src/consensus/parlia/validation.rs b/src/consensus/parlia/validation.rs index 58f23d4..73e547b 100644 --- a/src/consensus/parlia/validation.rs +++ b/src/consensus/parlia/validation.rs @@ -9,9 +9,11 @@ use alloy_primitives::{Address, B256, U256}; use alloy_consensus::BlockHeader; use reth::consensus::ConsensusError; use reth_chainspec::EthChainSpec; +//use reth_eth_wire::snap; use reth_primitives_traits::SealedHeader; use std::collections::HashMap; use std::sync::Arc; +use crate::consensus::parlia::util::calculate_millisecond_timestamp; /// BSC consensus validator that implements the missing pre/post execution logic #[derive(Debug, Clone)] @@ -45,6 +47,7 @@ where /// Verify block time for Ramanujan fork /// After Ramanujan activation, blocks must respect specific timing rules + // TODO: refine and fix this function, now bypass backoff time. fn verify_block_time_for_ramanujan( &self, snapshot: &Snapshot, @@ -55,11 +58,14 @@ where let block_interval = snapshot.block_interval; let back_off_time = self.calculate_back_off_time(snapshot, header); - if header.timestamp() < parent.timestamp() + block_interval + back_off_time { + if calculate_millisecond_timestamp(header) < calculate_millisecond_timestamp(parent) + block_interval + back_off_time { return Err(ConsensusError::Other(format!( - "Block time validation failed for Ramanujan fork: block {} timestamp {} too early", + "Block time validation failed for Ramanujan fork: block {} timestamp {} too early, parent_timestamp {}, block_interval {}, backoff_time {}", header.number(), - header.timestamp() + calculate_millisecond_timestamp(header), + calculate_millisecond_timestamp(parent), + block_interval, + back_off_time ))); } } @@ -70,13 +76,16 @@ where fn calculate_back_off_time(&self, snapshot: &Snapshot, header: &SealedHeader) -> u64 { let validator = header.beneficiary(); let is_inturn = snapshot.inturn_validator() == validator; - + if is_inturn { 0 } else { // Out-of-turn validators must wait longer - let turn_length = snapshot.turn_length.unwrap_or(1) as u64; - turn_length * snapshot.block_interval / 2 + // TODO: fix this calculation. + // https://github.com/bnb-chain/reth-bsc-trail/blob/main/crates/bsc/consensus/src/lib.rs#L293 + // let turn_length = snapshot.turn_length.unwrap_or(1) as u64; + // turn_length * snapshot.block_interval / 2 + 0 } } @@ -126,9 +135,13 @@ where if header.difficulty() != U256::from(expected_difficulty) { return Err(ConsensusError::Other(format!( - "Invalid difficulty: expected {}, got {}", + "Invalid difficulty: expected {}, got {}, expected_validator={}, actual_validator={} at block {}, snapshot_block={}", expected_difficulty, - header.difficulty() + header.difficulty(), + snapshot.inturn_validator(), + proposer, + header.number(), + snapshot.block_number ))); } diff --git a/src/consensus/parlia/validator.rs b/src/consensus/parlia/validator.rs index cdaf918..3aeaaf3 100644 --- a/src/consensus/parlia/validator.rs +++ b/src/consensus/parlia/validator.rs @@ -1,4 +1,3 @@ -use super::snapshot::Snapshot; use super::{EXTRA_SEAL, EXTRA_VANITY}; use alloy_primitives::Address; use reth::consensus::{ConsensusError, HeaderValidator}; @@ -93,17 +92,7 @@ where (vals, Some(vote_vals), turn_len) } -/// Very light-weight snapshot provider (trait object) so the header validator can fetch the latest snapshot. -pub trait SnapshotProvider: Send + Sync { - /// Returns the snapshot that is valid for the given `block_number` (usually parent block). - fn snapshot(&self, block_number: u64) -> Option; - /// Inserts (or replaces) the snapshot in the provider. - fn insert(&self, snapshot: Snapshot); - - /// Fetches header by block number for checkpoint parsing (like reth-bsc-trail's get_header_by_hash) - fn get_checkpoint_header(&self, block_number: u64) -> Option; -} /// Header validator for Parlia consensus. /// diff --git a/src/main.rs b/src/main.rs index 1b355da..021d569 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,7 @@ fn main() -> eyre::Result<()> { Cli::::parse().run_with_components::( |spec| { - // ComponentsBuilder will call BscConsensusBuilder to build the consensus. + // ComponentsBuilder BscConsensusBuilder automatically overwrite it to ParliaConsensus. (BscEvmConfig::new(spec.clone()), NoopConsensus::arc()) }, async move |builder, _| { @@ -36,16 +36,14 @@ fn main() -> eyre::Result<()> { let NodeHandle { node, node_exit_future: exit_future } = builder.node(node) .extend_rpc_modules(move |ctx| { - tracing::info!("Start to Register Parlia RPC API: parlia_getSnapshot"); + tracing::info!("Start to register Parlia RPC API: parlia_getSnapshot"); use reth_bsc::rpc::parlia::{ParliaApiImpl, ParliaApiServer, DynSnapshotProvider}; let snapshot_provider = if let Some(provider) = reth_bsc::shared::get_snapshot_provider() { - tracing::info!("Using shared persistent snapshot provider from consensus builder"); provider.clone() } else { - tracing::error!("Shared snapshot provider not available, using fallback"); - use reth_bsc::consensus::parlia::{InMemorySnapshotProvider, SnapshotProvider}; - Arc::new(InMemorySnapshotProvider::new(1000)) as Arc + tracing::error!("Failed to register Parlia RPC due to can not get snapshot provider"); + return Err(eyre::eyre!("Failed to get snapshot provider")); }; let wrapped_provider = Arc::new(DynSnapshotProvider::new(snapshot_provider)); diff --git a/src/node/consensus.rs b/src/node/consensus.rs index e34d9a4..67f9507 100644 --- a/src/node/consensus.rs +++ b/src/node/consensus.rs @@ -31,16 +31,24 @@ where panic!("Failed to initialize on-demand MDBX snapshots: {}", e); }); - let consensus = ParliaConsensus::new( - ctx.chain_spec(), + let consensus_concrete: ParliaConsensus<_, _> = ParliaConsensus::new( + ctx.chain_spec(), snapshot_provider.clone(), EPOCH, // BSC epoch length (200 blocks) ); - + // Store the snapshot provider globally so RPC can access it - let _ = crate::shared::set_snapshot_provider(snapshot_provider as Arc); - - Ok(Arc::new(consensus)) + let _ = crate::shared::set_snapshot_provider( + snapshot_provider as Arc, + ); + + // Store consensus globally for RPC access as a trait object that also exposes validator API + let consensus_obj_global: Arc = Arc::new(consensus_concrete.clone()); + let _ = crate::shared::set_parlia_consensus(consensus_obj_global); + + // Return the consensus as FullConsensus for the builder API + let consensus_obj: Arc> = Arc::new(consensus_concrete); + Ok(consensus_obj) } } diff --git a/src/node/evm/executor.rs b/src/node/evm/executor.rs index 1dd846d..f97fe21 100644 --- a/src/node/evm/executor.rs +++ b/src/node/evm/executor.rs @@ -39,6 +39,10 @@ use revm::{ use tracing::{debug, trace, warn}; use alloy_eips::eip2935::{HISTORY_STORAGE_ADDRESS, HISTORY_STORAGE_CODE}; use alloy_primitives::keccak256; +use std::sync::Arc; +use crate::consensus::parlia::SnapshotProvider; +// use crate::BscPrimitives; // not needed directly here +// use reth::consensus::{ConsensusError, FullConsensus}; // trait object is via ParliaConsensusObject pub struct BscBlockExecutor<'a, EVM, Spec, R: ReceiptBuilder> where @@ -59,13 +63,19 @@ where /// System contracts used to trigger fork specific logic. system_contracts: SystemContract, /// Hertz patch manager for mainnet compatibility + /// TODO: refine later. + #[allow(dead_code)] hertz_patch_manager: HertzPatchManager, /// Context for block execution. _ctx: EthBlockExecutionCtx<'a>, /// Utility to call system caller. system_caller: SystemCaller, - /// state hook + /// State hook. hook: Option>, + /// Snapshot provider for accessing Parlia validator snapshots. + pub(super) snapshot_provider: Option>, + /// Parlia consensus instance used (optional during execution). + pub(super) parlia_consensus: Option>, } impl<'a, DB, EVM, Spec, R: ReceiptBuilder> BscBlockExecutor<'a, EVM, Spec, R> @@ -109,6 +119,8 @@ where _ctx, system_caller: SystemCaller::new(spec_clone), hook: None, + snapshot_provider: crate::shared::get_snapshot_provider().cloned(), + parlia_consensus: crate::shared::get_parlia_consensus().cloned(), } } @@ -451,69 +463,19 @@ where type Evm = E; fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> { - // Set state clear flag if the block is after the Spurious Dragon hardfork. - let state_clear_flag = - self.spec.is_spurious_dragon_active_at_block(self.evm.block().number.to()); - self.evm.db_mut().set_state_clear_flag(state_clear_flag); + // pre check and prepare some intermediate data for commit parlia snapshot in finish function. + let block_env = self.evm.block().clone(); + self.check_new_block(&block_env)?; - // TODO: (Consensus Verify cascading fields)[https://github.com/bnb-chain/reth/blob/main/crates/bsc/evm/src/pre_execution.rs#L43] - // TODO: (Consensus System Call Before Execution)[https://github.com/bnb-chain/reth/blob/main/crates/bsc/evm/src/execute.rs#L678] + // set state clear flag if the block is after the Spurious Dragon hardfork. + let state_clear_flag = self.spec.is_spurious_dragon_active_at_block(self.evm.block().number.to()); + self.evm.db_mut().set_state_clear_flag(state_clear_flag); if !self.spec.is_feynman_active_at_timestamp(self.evm.block().timestamp.to()) { self.upgrade_contracts()?; } - // ----------------------------------------------------------------- - // reth-bsc-trail PATTERN: Get parent snapshot at start of execution - // This ensures we have the parent snapshot available for the entire execution - // ----------------------------------------------------------------- - use crate::consensus::parlia::{hooks::{ParliaHooks, PreExecutionHook}, snapshot::Snapshot}; - - // Get parent snapshot at start of execution (like reth-bsc-trail does) - let current_block_number = self.evm.block().number.to::(); - let parent_block_number = current_block_number.saturating_sub(1); - - let snap_for_hooks = if let Some(provider) = crate::shared::get_snapshot_provider() { - // Get parent snapshot (like reth-bsc-trail does at start of execution) - match provider.snapshot(parent_block_number) { - Some(parent_snap) => { - tracing::debug!("✅ [BSC] Got parent snapshot for block {} at start of execution (following reth-bsc-trail pattern)", parent_block_number); - parent_snap - }, - None => { - tracing::warn!("⚠️ [BSC] Parent snapshot not available for block {} at start of execution", parent_block_number); - Snapshot::default() - } - } - } else { - tracing::debug!("🔍 [BSC] No global snapshot provider available, using placeholder for hooks"); - Snapshot::default() - }; - let beneficiary = self.evm.block().beneficiary; - - // Assume in-turn for now; detailed check requires snapshot state which will be wired - // later. - let in_turn = true; - - // DEBUG: Uncomment to trace Parlia pre-execution hooks - // debug!("🎯 [BSC] apply_pre_execution_changes: calling Parlia pre-execution hooks, beneficiary={:?}, in_turn={}", - // beneficiary, in_turn); - - let pre_out = (ParliaHooks, &self.system_contracts) - .on_pre_execution(&snap_for_hooks, beneficiary, in_turn); - - // DEBUG: Uncomment to trace Parlia hooks output - // debug!("🎯 [BSC] apply_pre_execution_changes: Parlia hooks returned {} system txs, reserved_gas={}", - // pre_out.system_txs.len(), pre_out.reserved_gas); - - // Queue system-transactions for execution in finish(). - // Note: We don't reserve gas here since we'll execute the actual transactions and count their real gas usage. - self.system_txs.extend(pre_out.system_txs.into_iter()); - - // DEBUG: Uncomment to trace queued system transactions count - // debug!("🎯 [BSC] apply_pre_execution_changes: total queued system txs now: {}", self.system_txs.len()); - - // enable BEP-440/EIP-2935 for historical block hashes from state + // enable BEP-440/EIP-2935 for historical block hashes from state. if self.spec.is_prague_transition_at_timestamp(self.evm.block().timestamp.to(), self.evm.block().timestamp.to::() - 3) { self.apply_history_storage_account(self.evm.block().number.to::())?; } @@ -529,7 +491,7 @@ where _tx: impl ExecutableTx, _f: impl FnOnce(&ExecutionResult<::HaltReason>) -> CommitChanges, ) -> Result, BlockExecutionError> { - Ok(Some(0)) + unimplemented!(); } fn execute_transaction_with_result_closure( @@ -625,6 +587,7 @@ where mut self, ) -> Result<(Self::Evm, BlockExecutionResult), BlockExecutionError> { + self.finalize_new_block(&self.evm.block().clone())?; // TODO: // Consensus: Verify validators diff --git a/src/node/evm/mod.rs b/src/node/evm/mod.rs index 031e9ea..7942833 100644 --- a/src/node/evm/mod.rs +++ b/src/node/evm/mod.rs @@ -24,6 +24,8 @@ mod assembler; pub mod config; pub use config::BscEvmConfig; mod executor; +mod pre_execution; +mod post_execution; mod factory; mod patch; diff --git a/src/node/evm/post_execution.rs b/src/node/evm/post_execution.rs new file mode 100644 index 0000000..6d23c9d --- /dev/null +++ b/src/node/evm/post_execution.rs @@ -0,0 +1,34 @@ +use super::executor::BscBlockExecutor; +use crate::evm::transaction::BscTxEnv; +use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks}; +use reth_evm::{eth::receipt_builder::ReceiptBuilder, execute::BlockExecutionError, Database, Evm, FromRecoveredTx, FromTxWithEncoded, IntoTxEnv}; +use reth_primitives::TransactionSigned; +use reth_revm::State; +use revm::context::BlockEnv; +use alloy_consensus::TxReceipt; + +impl<'a, DB, EVM, Spec, R: ReceiptBuilder> BscBlockExecutor<'a, EVM, Spec, R> +where + DB: Database + 'a, + EVM: Evm< + DB = &'a mut State, + Tx: FromRecoveredTx + + FromRecoveredTx + + FromTxWithEncoded, + >, + Spec: EthereumHardforks + crate::hardforks::BscHardforks + EthChainSpec + Hardforks + Clone, + R: ReceiptBuilder, + ::Transaction: Unpin + From, + ::Tx: FromTxWithEncoded<::Transaction>, + BscTxEnv: IntoTxEnv<::Tx>, + R::Transaction: Into, +{ + /// finalize the new block, post check and finalize the system tx. + /// depends on parlia, header and snapshot. + pub(crate) fn finalize_new_block(&mut self, block: &BlockEnv) -> Result<(), BlockExecutionError> { + // TODO: implement this function. + // unimplemented!(); + tracing::info!("Finalize new block, block_number: {}", block.number); + Ok(()) + } +} \ No newline at end of file diff --git a/src/node/evm/pre_execution.rs b/src/node/evm/pre_execution.rs new file mode 100644 index 0000000..e63da84 --- /dev/null +++ b/src/node/evm/pre_execution.rs @@ -0,0 +1,96 @@ +use super::executor::BscBlockExecutor; +use crate::evm::transaction::BscTxEnv; +use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks}; +use reth_evm::{eth::receipt_builder::ReceiptBuilder, execute::BlockExecutionError, Database, Evm, FromRecoveredTx, FromTxWithEncoded, IntoTxEnv}; +use reth_primitives::TransactionSigned; +use reth_revm::State; +use revm::context::BlockEnv; +use alloy_consensus::TxReceipt; +// use consensus trait object for cascading validation + +impl<'a, DB, EVM, Spec, R: ReceiptBuilder> BscBlockExecutor<'a, EVM, Spec, R> +where + DB: Database + 'a, + EVM: Evm< + DB = &'a mut State, + Tx: FromRecoveredTx + + FromRecoveredTx + + FromTxWithEncoded, + >, + Spec: EthereumHardforks + crate::hardforks::BscHardforks + EthChainSpec + Hardforks + Clone, + R: ReceiptBuilder, + ::Transaction: Unpin + From, + ::Tx: FromTxWithEncoded<::Transaction>, + BscTxEnv: IntoTxEnv<::Tx>, + R::Transaction: Into, +{ + /// check the new block, pre check and prepare some intermediate data for commit parlia snapshot in finish function. + /// depends on parlia, header and snapshot. + pub(crate) fn check_new_block(&mut self, block: &BlockEnv) -> Result<(), BlockExecutionError> { + let block_number = block.number.to::(); + tracing::info!("Check new block, block_number: {}", block_number); + + let header = self + .snapshot_provider + .as_ref() + .unwrap() + .get_checkpoint_header(block_number) + .ok_or(BlockExecutionError::msg("Failed to get header from snapshot provider"))?; + + let parent_header = self + .snapshot_provider + .as_ref() + .unwrap() + .get_checkpoint_header(block_number - 1) + .ok_or(BlockExecutionError::msg("Failed to get parent header from snapshot provider"))?; + + let snap = self + .snapshot_provider + .as_ref() + .unwrap() + .snapshot(block_number-1) + .ok_or(BlockExecutionError::msg("Failed to get snapshot from snapshot provider"))?; + + // Delegate to Parlia consensus object; no ancestors available here, pass None + // TODO: move this part logic codes to executor to ensure parlia is lightly. + let verify_res = self + .parlia_consensus + .as_ref() + .unwrap() + .verify_cascading_fields(&header, &parent_header, None, &snap); + + if let Err(err) = verify_res { + let proposer = header.beneficiary; + let is_inturn = snap.is_inturn(proposer); + let expected_difficulty: u64 = if is_inturn { 2 } else { 1 }; + let recent_counts = snap.count_recent_proposers(); + + tracing::error!( + target: "bsc::pre_execution", + error = ?err, + block_number = header.number, + parent_number = parent_header.number, + header_timestamp = header.timestamp, + parent_timestamp = parent_header.timestamp, + proposer = %format!("0x{:x}", proposer), + validators_len = snap.validators.len(), + epoch_len = snap.epoch_num, + turn_length = snap.turn_length.map(|v| v as u64), + block_interval = snap.block_interval, + is_inturn, + expected_difficulty, + header_difficulty = %format!("{}", header.difficulty), + miner_history_check_len = snap.miner_history_check_len(), + recent_proposers = %format!("{:?}", snap.recent_proposers), + recent_counts = %format!("{:?}", recent_counts), + "Consensus verify_cascading_fields failed with detailed diagnostics" + ); + + return Err(err); + } + + // TODO: query finalise input from parlia consensus object. + + Ok(()) + } +} \ No newline at end of file diff --git a/src/rpc/parlia.rs b/src/rpc/parlia.rs index b48ed85..c541a79 100644 --- a/src/rpc/parlia.rs +++ b/src/rpc/parlia.rs @@ -202,12 +202,24 @@ impl ParliaApiServer for ParliaApiI #[cfg(test)] mod tests { use super::*; - use crate::consensus::parlia::InMemorySnapshotProvider; + use crate::chainspec::{bsc_testnet, BscChainSpec}; + use crate::consensus::parlia::provider::EnhancedDbSnapshotProvider; + use reth_db::test_utils::create_test_rw_db; + use reth_provider::test_utils::NoopProvider; #[tokio::test] async fn test_snapshot_api() { - let snapshot_provider = Arc::new(InMemorySnapshotProvider::new(100)); + // Build an EnhancedDbSnapshotProvider backed by a temp DB and noop header provider + let db = create_test_rw_db(); + let header_provider = Arc::new(NoopProvider::default()); + let chain_spec = Arc::new(BscChainSpec::from(bsc_testnet())); + let snapshot_provider = Arc::new(EnhancedDbSnapshotProvider::new( + db.clone(), + 2048, + header_provider, + chain_spec, + )); // Insert a test snapshot let mut test_snapshot = Snapshot::default(); diff --git a/src/shared.rs b/src/shared.rs index 598c62e..6402291 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -3,12 +3,15 @@ //! This module provides global access to the snapshot provider so that //! both the consensus builder and RPC modules can access the same instance. -use crate::consensus::parlia::SnapshotProvider; +use crate::consensus::parlia::{SnapshotProvider, ParliaConsensusObject}; +// use reth::consensus::ConsensusError; // not needed in this module use std::sync::{Arc, OnceLock}; /// Global shared access to the snapshot provider for RPC static SNAPSHOT_PROVIDER: OnceLock> = OnceLock::new(); +static PARLIA_CONSENSUS: OnceLock> = OnceLock::new(); + /// Store the snapshot provider globally pub fn set_snapshot_provider(provider: Arc) -> Result<(), Arc> { SNAPSHOT_PROVIDER.set(provider) @@ -17,4 +20,14 @@ pub fn set_snapshot_provider(provider: Arc) /// Get the global snapshot provider pub fn get_snapshot_provider() -> Option<&'static Arc> { SNAPSHOT_PROVIDER.get() +} + +/// Store the parlia consensus globally +pub fn set_parlia_consensus(consensus: Arc) -> Result<(), Arc> { + PARLIA_CONSENSUS.set(consensus) +} + +/// Get the global parlia consensus +pub fn get_parlia_consensus() -> Option<&'static Arc> { + PARLIA_CONSENSUS.get() } \ No newline at end of file