Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/chain_sync/bad_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use nonzero_ext::nonzero;

use crate::utils::{cache::SizeTrackingLruCache, get_size};

/// Default capacity for CID caches (32768 entries).
/// That's about 4 MiB.
const DEFAULT_CID_CACHE_CAPACITY: NonZeroUsize = nonzero!(1usize << 15);

/// Thread-safe cache for tracking bad blocks.
/// This cache is checked before validating a block, to ensure no duplicate
/// work.
Expand All @@ -18,7 +22,7 @@ pub struct BadBlockCache {

impl Default for BadBlockCache {
fn default() -> Self {
Self::new(nonzero!(1usize << 15))
Self::new(DEFAULT_CID_CACHE_CAPACITY)
}
}

Expand All @@ -40,3 +44,30 @@ impl BadBlockCache {
self.cache.peek_cloned(&(*c).into())
}
}

/// Thread-safe LRU cache for tracking recently seen gossip block CIDs.
/// Used to de-duplicate gossip blocks before expensive message fetching.
#[derive(Debug, Clone)]
pub struct SeenBlockCache {
cache: SizeTrackingLruCache<get_size::CidWrapper, ()>,
}

impl Default for SeenBlockCache {
fn default() -> Self {
Self::new(DEFAULT_CID_CACHE_CAPACITY)
}
}

impl SeenBlockCache {
pub fn new(cap: NonZeroUsize) -> Self {
Self {
cache: SizeTrackingLruCache::new_with_metrics("seen_gossip_block".into(), cap),
}
}

/// Returns `true` if the CID was already present (duplicate).
/// Always inserts/refreshes the entry.
pub fn test_and_insert(&self, c: &Cid) -> bool {
self.cache.push((*c).into(), ()).is_some()
}
}
26 changes: 24 additions & 2 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use crate::{
chain::ChainStore,
chain_sync::{
ForkSyncInfo, ForkSyncStage, SyncStatus, SyncStatusReport, TipsetValidator,
bad_block_cache::BadBlockCache,
bad_block_cache::{BadBlockCache, SeenBlockCache},
metrics,
tipset_syncer::{TipsetSyncerError, validate_tipset},
validation::GossipBlockValidator,
},
libp2p::{NetworkEvent, PubsubMessage, hello::HelloRequest},
message_pool::MessagePool,
Expand Down Expand Up @@ -146,6 +147,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
stateless_mode,
)));
let tasks: Arc<Mutex<HashSet<SyncTask>>> = Arc::new(Mutex::new(HashSet::default()));
let seen_block_cache = SeenBlockCache::default();

let mut set = JoinSet::new();

Expand All @@ -155,6 +157,8 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
let state_changed = state_changed.clone();
let state_machine = state_machine.clone();
let network = network.clone();
let bad_block_cache = bad_block_cache.clone();
let seen_block_cache = seen_block_cache.clone();
async move {
while let Ok(event) = network_rx.recv_async().await {
inc_gossipsub_event_metrics(&event);
Expand All @@ -180,8 +184,26 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
}
NetworkEvent::PubsubMessage { message } => match message {
PubsubMessage::Block(b) => {
let cs = state_manager.chain_store();
let cfg = cs.chain_config();
if let Err(reason) = GossipBlockValidator::new(&b).validate_pre_fetch(
&genesis,
cfg.block_delay_secs,
cfg.policy.chain_finality,
cs.heaviest_tipset().epoch(),
bad_block_cache.as_deref(),
&seen_block_cache,
) {
metrics::GOSSIP_BLOCK_REJECTED_TOTAL
.get_or_create(&metrics::GossipRejectReasonLabel {
reason: reason.label(),
})
.inc();
debug!("Rejected gossip block {}: {reason}", b.header.cid());
continue;
}
let key = TipsetKey::from(nunny::vec![*b.header.cid()]);
get_full_tipset(&network, state_manager.chain_store(), None, &key).await
get_full_tipset(&network, cs, None, &key).await
}
PubsubMessage::Message(m) => {
if let Err(why) = mem_pool.add(m) {
Expand Down
15 changes: 15 additions & 0 deletions src/chain_sync/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,25 @@ pub static INVALID_TIPSET_TOTAL: LazyLock<Counter> = LazyLock::new(|| {
);
metric
});
pub static GOSSIP_BLOCK_REJECTED_TOTAL: LazyLock<Family<GossipRejectReasonLabel, Counter>> =
LazyLock::new(|| {
let metric = Family::default();
crate::metrics::default_registry().register(
"gossip_block_rejected_total",
"Total number of gossip blocks rejected by pre-validation",
metric.clone(),
);
metric
});

#[derive(Clone, Debug, Hash, PartialEq, Eq, derive_more::Constructor)]
pub struct Libp2pMessageKindLabel(&'static str);

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet, derive_more::Constructor)]
pub struct GossipRejectReasonLabel {
pub reason: &'static str,
}

impl EncodeLabelSet for Libp2pMessageKindLabel {
fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), std::fmt::Error> {
let mut label_encoder = encoder.encode_label();
Expand Down
Loading
Loading