Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ jobs:
run: sudo apt-get update && sudo apt-get install -y liburing-dev pkg-config libclang-dev
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
toolchain: nightly-2026-05-11
components: rustfmt
override: true
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@cargo-udeps
- name: Run cargo udeps
run: cargo +nightly udeps --workspace --lib --examples --tests --benches --all-features --locked
run: cargo +nightly-2026-05-11 udeps --workspace --lib --examples --tests --benches --all-features --locked

clippy:
name: clippy
Expand Down
15 changes: 13 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,19 @@ codegen-units = 16
inherits = "release"
lto = "fat"
codegen-units = 1
# Keep line-tables so flamegraph frames carry Rust function names. Without this
# the maxperf binary is stripped (inherited from [profile.release]) and every
# reth-bsc frame collapses to a single `[reth-bsc]` bar in the SVG.
debug = "line-tables-only"
strip = false
split-debuginfo = "off"

# Apply the same to every transitive dependency so triedb/revm/rayon frames
# also resolve. Without this, only reth-bsc's own symbols show up and the
# whole state-root path still looks opaque.
[profile.maxperf.package."*"]
debug = "line-tables-only"
strip = false

[dev-dependencies]
uuid = { version = "1", features = ["v4"] }


52 changes: 41 additions & 11 deletions src/consensus/parlia/block_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
//! Mirrors geth's `BlockStats` / `reportRecentBlocksLoop` functionality by tracking
//! block timestamps and event timestamps to compute chain delay metrics.

use alloy_consensus::Header;
use alloy_primitives::B256;
use lru::LruCache;
use once_cell::sync::Lazy;
use std::{num::NonZero, sync::RwLock};

use crate::consensus::parlia::util::calculate_millisecond_timestamp;
use crate::metrics::BscChainDelayMetrics;

/// Size of the block stats LRU cache.
Expand All @@ -18,7 +20,9 @@ const DEFAULT_MAJORITY_THRESHOLD: usize = 14;

/// Per-block tracking data for chain delay metrics.
struct BlockStat {
/// Block timestamp in milliseconds (header.timestamp * 1000).
/// Block timestamp in milliseconds, from `calculate_millisecond_timestamp(header)`
/// (combines `header.timestamp` seconds with the Lorentz-era ms portion stored in
/// `header.mix_hash`).
block_timestamp_ms: i64,
/// Whether the first vote delay has been reported.
first_vote_reported: bool,
Expand All @@ -38,23 +42,49 @@ fn now_ms() -> i64 {
.as_millis() as i64
}

/// Register a block's timestamp when it is first received from the network.
/// Also records the `chain.delay.block_recv` metric (delay from block creation to reception).
pub fn on_block_received(block_hash: B256, block_timestamp_secs: u64) {
let block_ts_ms = block_timestamp_secs as i64 * 1000;
/// Cache a block's millisecond-precision timestamp so subsequent `on_vote_received` calls can
/// compute vote-delay metrics. Does **not** touch the `chain.delay.block_recv` histogram.
///
/// On Lorentz and later forks, the block timestamp has millisecond precision (split between
/// `header.timestamp` (seconds) and `header.mix_hash` (ms part)); we use
/// `calculate_millisecond_timestamp` so delays are not biased by the 0-999 ms portion.
fn cache_block_timestamp(block_hash: B256, header: &Header) -> i64 {
let block_ts_ms = calculate_millisecond_timestamp(header) as i64;
let mut cache = BLOCK_STATS.write().expect("block stats poisoned");
cache.get_or_insert(block_hash, || BlockStat {
block_timestamp_ms: block_ts_ms,
first_vote_reported: false,
majority_vote_reported: false,
});
block_ts_ms
}

/// Register a block's timestamp when it is first received from the network, and record the
/// `chain.delay.block_recv` metric (delay from block creation to first network reception).
///
/// This is the network-receive path. For locally mined blocks call [`register_self_mined_block`]
/// instead — they would otherwise pollute `block_recv` with samples that actually measure local
/// mining/finalize latency rather than true network propagation delay (mirrors geth-bsc, where
/// `RecvNewBlockTime` is only set in `handleBlockBroadcast`).
pub fn on_block_received(block_hash: B256, header: &Header) {
let block_ts_ms = cache_block_timestamp(block_hash, header);
let recv_time = now_ms();

let delay_ms = recv_time - block_ts_ms;
if delay_ms >= 0 {
CHAIN_DELAY_METRICS.block_recv.record(delay_ms as f64);
}
}

let mut cache = BLOCK_STATS.write().expect("block stats poisoned");
cache.get_or_insert(block_hash, || BlockStat {
block_timestamp_ms: block_ts_ms,
first_vote_reported: false,
majority_vote_reported: false,
});
/// Register a self-mined block's timestamp so subsequent `on_vote_received` calls work, **without**
/// recording `chain.delay.block_recv`.
///
/// Mirrors geth-bsc's split between `SendBlockTime` (miner path) and `RecvNewBlockTime`
/// (network path): we want votes for our own block to still count toward `vote_first` /
/// `vote_majority`, but the block-recv histogram must stay clean of self-mined samples so it
/// can be used to diagnose cross-region network propagation delays.
pub fn register_self_mined_block(block_hash: B256, header: &Header) {
cache_block_timestamp(block_hash, header);
}

/// Called when a vote is added for a block. Records first-vote and majority-vote delay
Expand Down
45 changes: 35 additions & 10 deletions src/consensus/parlia/vote_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::{
cmp::Reverse,
collections::{BinaryHeap, HashMap, HashSet},
num::NonZero,
sync::RwLock,
sync::{
atomic::{AtomicU64, Ordering},
RwLock,
},
};

use alloy_primitives::{BlockNumber, B256};
Expand Down Expand Up @@ -220,6 +223,10 @@ impl VotePool {
/// Global singleton pool.
static VOTE_POOL: Lazy<RwLock<VotePool>> = Lazy::new(|| RwLock::new(VotePool::new()));

/// Highest block number against which the pool has already been pruned.
/// Throttles [`put_vote`]'s lazy prune to once per observed head advance.
static LAST_PRUNED_BLOCK: AtomicU64 = AtomicU64::new(0);

/// Global metrics for vote operations.
static VOTE_METRICS: Lazy<BscVoteMetrics> = Lazy::new(BscVoteMetrics::default);

Expand All @@ -244,8 +251,35 @@ pub fn put_vote(vote: VoteEnvelope) {
// Get pending block number for malicious vote detection scope
let pending_block_number = shared::get_best_canonical_block_number().unwrap_or(0);

// Lazy prune: evict votes below `head - LOWER_LIMIT_OF_VOTE_BLOCK_NUMBER`
// once per observed head advance. Replaces geth-bsc's chain-head event
// subscription by piggybacking on the vote ingest path (same cadence).
// `fetch_max` keeps the watermark monotonic across racing writers; the
// inner prune is O(0) when nothing is stale.
let need_prune = pending_block_number > LAST_PRUNED_BLOCK.load(Ordering::Relaxed);

let target_number = vote.data.target_number;

let mut pool = VOTE_POOL.write().expect("vote pool poisoned");
let votes_for_block = pool.insert(vote, pending_block_number);
if need_prune {
pool.prune(pending_block_number);
LAST_PRUNED_BLOCK.fetch_max(pending_block_number, Ordering::Relaxed);
}

// Force prune if pool is too large, prevents memory issues during stage sync.
const MAX_VOTES_IN_POOL: usize = 32 * 1024 * 2;
if pool.len() > MAX_VOTES_IN_POOL {
let force_prune = target_number.saturating_sub(LOWER_LIMIT_OF_VOTE_BLOCK_NUMBER);
pool.prune(force_prune);
tracing::debug!(
target: "bsc::vote_pool",
pool_size = pool.len(),
force_prune_block_number = force_prune,
"Vote pool oversized, force pruned"
);
}

let size = pool.len();
drop(pool);
update_vote_pool_size_metric(size);
Expand Down Expand Up @@ -295,15 +329,6 @@ pub fn fetch_vote_by_block_hash_and_source_number(
.fetch_vote_by_block_hash_and_source_number(block_hash, source_number)
}

/// Prune old votes based on the latest block number.
pub fn prune(latest_block_number: BlockNumber) {
let mut pool = VOTE_POOL.write().expect("vote pool poisoned");
pool.prune(latest_block_number);
let size = pool.len();
drop(pool);
update_vote_pool_size_metric(size);
}

fn maybe_notify_finality(target_hash: B256, votes_for_block: usize) {
// Check if we've already notified for this block (de-duplication)
{
Expand Down
6 changes: 1 addition & 5 deletions src/node/miner/bsc_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::node::miner::bid_simulator::{BidRuntime, BidSimulator};
use crate::node::miner::payload::BscBuildArguments;
use crate::{
chainspec::BscChainSpec,
consensus::parlia::{provider::SnapshotProvider, vote_pool, Parlia},
consensus::parlia::{provider::SnapshotProvider, Parlia},
metrics::BscConsensusMetrics,
node::{
engine::BscBuiltPayload,
Expand Down Expand Up @@ -247,10 +247,6 @@ where
}

let tip_header = tip.clone_sealed_header();
// Prune old votes from the vote pool based on the new block number
let block_number =
self.provider.last_block_number().ok().unwrap_or(tip_header.number());
vote_pool::prune(block_number);

// Produce and broadcast a local vote for this new canonical head, if eligible
if let Some(sp) = crate::shared::get_snapshot_provider() {
Expand Down
44 changes: 33 additions & 11 deletions src/node/miner/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1270,8 +1270,26 @@ where
trace_id = self.trace_id,
block_number = self.build_args.config.parent_header.number() + 1,
wait_ms = initial_wait.as_millis(),
"Applying out-of-turn backoff before starting payload build"
"Applying out-of-turn backoff; starting speculative build to warm TrieDB prefetcher"
);

// Kick off a speculative build before sleeping so the TrieDB prefetcher
// can warm the storage slots state-root will need. Without this the
// prefetcher only starts after the backoff ends, leaving ~one slot for
// both cache warm-up and state-root computation over thousands of txs —
// which repeatedly times out and degrades the block to EmptyFallback.
// The spawned build's result is picked up by the outer loop's
// join_next() branch, so the try_build_tx kickoff below is skipped when
// a speculative build is already in flight.
self.retries += 1;
start_time = std::time::Instant::now();
{
let builder = self.builder.clone();
let build_args = self.build_args.clone();
self.join_handle
.spawn(async move { builder.build_payload(build_args).await });
}

tokio::select! {
_ = tokio::time::sleep(initial_wait) => {}
_ = &mut self.abort_rx => {
Expand All @@ -1287,16 +1305,20 @@ where
// after the wait completes.
self.job_start_time = std::time::Instant::now();

if let Err(err) = self.try_build_tx.send(()) {
warn!(
target: "bsc::miner::payload",
trace_id = self.trace_id,
block_number = self.build_args.config.parent_header.number() + 1,
is_inturn = self.mining_ctx.is_inturn,
error = %err,
"Failed to send to first try build queue"
);
return Err(Box::new(BscPayloadJobError::BuildQueueSendError(err.to_string())));
// Skip the normal first-build kickoff if a speculative build from the
// out-of-turn backoff is already running or has completed into the JoinSet.
if self.join_handle.is_empty() {
if let Err(err) = self.try_build_tx.send(()) {
warn!(
target: "bsc::miner::payload",
trace_id = self.trace_id,
block_number = self.build_args.config.parent_header.number() + 1,
is_inturn = self.mining_ctx.is_inturn,
error = %err,
"Failed to send to first try build queue"
);
return Err(Box::new(BscPayloadJobError::BuildQueueSendError(err.to_string())));
}
}

loop {
Expand Down
Loading
Loading