Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
805 changes: 435 additions & 370 deletions Cargo.lock

Large diffs are not rendered by default.

45 changes: 35 additions & 10 deletions src/consensus/parlia/vote_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::{
cmp::Reverse,
collections::{BinaryHeap, HashMap, HashSet},
num::NonZero,
sync::RwLock,
sync::{
atomic::{AtomicU64, Ordering},
RwLock,
},
};

use alloy_primitives::{BlockNumber, B256};
Expand Down Expand Up @@ -220,6 +223,10 @@ impl VotePool {
/// Global singleton pool.
static VOTE_POOL: Lazy<RwLock<VotePool>> = Lazy::new(|| RwLock::new(VotePool::new()));

/// Highest block number against which the pool has already been pruned.
/// Throttles [`put_vote`]'s lazy prune to once per observed head advance.
static LAST_PRUNED_BLOCK: AtomicU64 = AtomicU64::new(0);

/// Global metrics for vote operations.
static VOTE_METRICS: Lazy<BscVoteMetrics> = Lazy::new(BscVoteMetrics::default);

Expand All @@ -244,8 +251,35 @@ pub fn put_vote(vote: VoteEnvelope) {
// Get pending block number for malicious vote detection scope
let pending_block_number = shared::get_best_canonical_block_number().unwrap_or(0);

// Lazy prune: evict votes below `head - LOWER_LIMIT_OF_VOTE_BLOCK_NUMBER`
// once per observed head advance. Replaces geth-bsc's chain-head event
// subscription by piggybacking on the vote ingest path (same cadence).
// `fetch_max` keeps the watermark monotonic across racing writers; the
// inner prune is O(0) when nothing is stale.
let need_prune = pending_block_number > LAST_PRUNED_BLOCK.load(Ordering::Relaxed);

let target_number = vote.data.target_number;

let mut pool = VOTE_POOL.write().expect("vote pool poisoned");
let votes_for_block = pool.insert(vote, pending_block_number);
if need_prune {
pool.prune(pending_block_number);
LAST_PRUNED_BLOCK.fetch_max(pending_block_number, Ordering::Relaxed);
}

// Force prune if pool is too large, prevents memory issues during stage sync.
const MAX_VOTES_IN_POOL: usize = 32 * 1024 * 2;
if pool.len() > MAX_VOTES_IN_POOL {
let force_prune = target_number.saturating_sub(LOWER_LIMIT_OF_VOTE_BLOCK_NUMBER);
pool.prune(force_prune);
tracing::debug!(
target: "bsc::vote_pool",
pool_size = pool.len(),
force_prune_block_number = force_prune,
"Vote pool oversized, force pruned"
);
}

let size = pool.len();
drop(pool);
update_vote_pool_size_metric(size);
Expand Down Expand Up @@ -295,15 +329,6 @@ pub fn fetch_vote_by_block_hash_and_source_number(
.fetch_vote_by_block_hash_and_source_number(block_hash, source_number)
}

/// Prune old votes based on the latest block number.
pub fn prune(latest_block_number: BlockNumber) {
let mut pool = VOTE_POOL.write().expect("vote pool poisoned");
pool.prune(latest_block_number);
let size = pool.len();
drop(pool);
update_vote_pool_size_metric(size);
}

fn maybe_notify_finality(target_hash: B256, votes_for_block: usize) {
// Check if we've already notified for this block (de-duplication)
{
Expand Down
6 changes: 1 addition & 5 deletions src/node/miner/bsc_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::node::miner::bid_simulator::{BidRuntime, BidSimulator};
use crate::node::miner::payload::BscBuildArguments;
use crate::{
chainspec::BscChainSpec,
consensus::parlia::{provider::SnapshotProvider, vote_pool, Parlia},
consensus::parlia::{provider::SnapshotProvider, Parlia},
metrics::BscConsensusMetrics,
node::{
engine::BscBuiltPayload,
Expand Down Expand Up @@ -246,10 +246,6 @@ where
}

let tip_header = tip.clone_sealed_header();
// Prune old votes from the vote pool based on the new block number
let block_number =
self.provider.last_block_number().ok().unwrap_or(tip_header.number());
vote_pool::prune(block_number);

// Produce and broadcast a local vote for this new canonical head, if eligible
if let Some(sp) = crate::shared::get_snapshot_provider() {
Expand Down
Loading
Loading