diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index c04d668fc5..96386c9b29 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -44,7 +44,7 @@ pub struct BestBlockInfo { /// The block number (for pruning purposes). pub block_number: ::Number, /// Reorg info if this block became best as part of a reorganization. - pub reorg_info: Option>, + pub reorg_info: Option>>, } pub struct MappingSyncWorker { @@ -139,7 +139,7 @@ where if notification.is_new_best { // For notification: include new_best_hash per Ethereum spec. let reorg_info = notification.tree_route.as_ref().map(|tree_route| { - ReorgInfo::from_tree_route(tree_route, notification.hash) + Arc::new(ReorgInfo::from_tree_route(tree_route, notification.hash)) }); self.best_at_import.insert( notification.hash, diff --git a/client/mapping-sync/src/lib.rs b/client/mapping-sync/src/lib.rs index 24d824e0c0..0af2927b56 100644 --- a/client/mapping-sync/src/lib.rs +++ b/client/mapping-sync/src/lib.rs @@ -26,6 +26,10 @@ pub mod sql; use sp_blockchain::TreeRoute; use sp_consensus::SyncOracle; use sp_runtime::traits::Block as BlockT; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; #[derive(Copy, Clone, Eq, PartialEq)] pub enum SyncStrategy { @@ -36,6 +40,18 @@ pub enum SyncStrategy { pub type EthereumBlockNotificationSinks = parking_lot::Mutex>>; +/// Default hard cap for pending notifications per subscriber channel. +/// Subscribers above this threshold are considered lagging and are dropped. +const DEFAULT_MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER: usize = 512; + +static MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER: AtomicUsize = + AtomicUsize::new(DEFAULT_MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER); + +/// Configure the hard cap for pending notifications per subscriber channel. +pub fn set_max_pending_notifications_per_subscriber(max_pending: usize) { + MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER.store(max_pending.max(1), Ordering::Relaxed); +} + /// Information about a chain reorganization. /// /// When a reorg occurs, this struct contains the blocks that were removed from @@ -87,7 +103,7 @@ pub struct EthereumBlockNotification { pub is_new_best: bool, pub hash: Block::Hash, /// Optional reorg information. Present when this block became best as part of a reorg. - pub reorg_info: Option>, + pub reorg_info: Option>>, } /// Context for emitting block notifications. @@ -99,7 +115,7 @@ pub struct BlockNotificationContext { /// Whether this block is the new best block. pub is_new_best: bool, /// Optional reorg information if this block became best as part of a reorg. - pub reorg_info: Option>, + pub reorg_info: Option>>, } /// Emit block notification to all registered sinks. @@ -124,6 +140,18 @@ pub fn emit_block_notification( } sinks.retain(|sink| { + let max_pending = MAX_PENDING_NOTIFICATIONS_PER_SUBSCRIBER.load(Ordering::Relaxed); + if sink.len() >= max_pending { + log::debug!( + target: "mapping-sync", + "Dropping lagging pubsub subscriber (pending={}, max={})", + sink.len(), + max_pending, + ); + let _ = sink.close(); + return false; + } + sink.unbounded_send(EthereumBlockNotification { is_new_best: context.is_new_best, hash: context.hash, diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index f6cfb18039..7a31c46ea0 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -48,7 +48,7 @@ pub enum WorkerCommand> { block_hash: H256, /// Whether this block was the new best at import time. is_new_best: bool, - reorg_info: Option>, + reorg_info: Option>>, }, /// Canonicalize the enacted and retracted blocks reported via import notifications. Canonicalize { @@ -251,7 +251,7 @@ where "🔀 Re-org happened at new best {}, proceeding to canonicalize db", notification.hash ); - let info = ReorgInfo::from_tree_route(tree_route, notification.hash); + let info = Arc::new(ReorgInfo::from_tree_route(tree_route, notification.hash)); // Note: new_best is handled separately by IndexBestBlock. tx.send(WorkerCommand::Canonicalize { common: info.common_ancestor, diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 47de38a059..f352d53772 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -19,7 +19,7 @@ use std::{marker::PhantomData, sync::Arc}; use ethereum::TransactionV3 as EthereumTransaction; -use futures::{future, FutureExt as _, StreamExt as _}; +use futures::{future, stream::BoxStream, FutureExt as _, StreamExt as _}; use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink}; use log::debug; // Substrate @@ -119,21 +119,43 @@ where } } - /// Get headers for enacted blocks during a reorg, including the new best block. - /// - /// Per Ethereum spec (https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads): - /// "When a chain reorganization occurs, this subscription will emit an event - /// containing all new headers (blocks) for the new chain. This means that you - /// may see multiple headers emitted with the same height (block number)." - /// - /// Returns headers in ascending order (oldest first), with `new_best` last. - fn get_reorg_headers(&self, enacted: &[B::Hash], new_best: B::Hash) -> Vec { - enacted - .iter() - .chain(std::iter::once(&new_best)) - .filter_map(|hash| self.storage_override.current_block(*hash)) - .map(PubSubResult::header) - .collect() + /// Convert a block notification into a stream of `newHeads` items. + /// For reorgs this emits enacted headers followed by the new best block. + fn new_heads_from_notification( + &self, + notification: EthereumBlockNotification, + ) -> BoxStream<'static, PubSubResult> { + if !notification.is_new_best { + return futures::stream::empty().boxed(); + } + + if let Some(reorg_info) = notification.reorg_info { + debug!( + target: "eth-pubsub", + "Reorg detected: new_best={:?}, {} blocks retracted, {} blocks enacted", + reorg_info.new_best, + reorg_info.retracted.len(), + reorg_info.enacted.len() + ); + + let pubsub = self.clone(); + let enacted = reorg_info.enacted.clone(); + let new_best = reorg_info.new_best; + return futures::stream::iter( + enacted + .into_iter() + .chain(std::iter::once(new_best)) + .filter_map(move |hash| pubsub.storage_override.current_block(hash)) + .map(PubSubResult::header), + ) + .boxed(); + } + + let maybe_header = self + .storage_override + .current_block(notification.hash) + .map(PubSubResult::header); + futures::stream::iter(maybe_header).boxed() } fn notify_logs( @@ -261,43 +283,10 @@ where // Per Ethereum spec, when a reorg occurs, we must emit all headers // for the new canonical chain. The reorg_info field in the notification // contains the enacted blocks when a reorg occurred. - let stream = block_notification_stream.filter_map(move |notification| { - if !notification.is_new_best { - return future::ready(None); - } - - // Check if this block came from a reorg - let headers = if let Some(ref reorg_info) = notification.reorg_info { - debug!( - target: "eth-pubsub", - "Reorg detected: new_best={:?}, {} blocks retracted, {} blocks enacted", - reorg_info.new_best, - reorg_info.retracted.len(), - reorg_info.enacted.len() - ); - // Emit all enacted blocks followed by the new best block - pubsub.get_reorg_headers(&reorg_info.enacted, reorg_info.new_best) - } else { - // Normal case: just emit the new block - if let Some(block) = - pubsub.storage_override.current_block(notification.hash) - { - vec![PubSubResult::header(block)] - } else { - return future::ready(None); - } - }; - - if headers.is_empty() { - return future::ready(None); - } - - future::ready(Some(headers)) + let flat_stream = block_notification_stream.flat_map(move |notification| { + pubsub.new_heads_from_notification(notification) }); - // Flatten the Vec into individual PubSubResult items - let flat_stream = stream.flat_map(futures::stream::iter); - PendingSubscription::from(pending) .pipe_from_stream(flat_stream, BoundedVecDeque::new(16)) .await @@ -330,9 +319,13 @@ where // in case of reorg, the first event is emitted right away. let syncing_status = pubsub.syncing_status().await; let subscription = Subscription::from(sink); - let _ = subscription + if subscription .send(&PubSubResult::SyncingStatus(syncing_status)) - .await; + .await + .is_err() + { + return; + } // When the node is not under a major syncing (i.e. from genesis), react // normally to import notifications. @@ -344,9 +337,13 @@ where let syncing_status = pubsub.sync.is_major_syncing(); if syncing_status != last_syncing_status { let syncing_status = pubsub.syncing_status().await; - let _ = subscription + if subscription .send(&PubSubResult::SyncingStatus(syncing_status)) - .await; + .await + .is_err() + { + break; + } } last_syncing_status = syncing_status; } diff --git a/template/node/src/eth.rs b/template/node/src/eth.rs index aa2f47aab0..6425807336 100644 --- a/template/node/src/eth.rs +++ b/template/node/src/eth.rs @@ -94,6 +94,10 @@ pub struct EthConfiguration { /// Default value is 200MB. #[arg(long, default_value = "209715200")] pub frontier_sql_backend_cache_size: u64, + + /// Maximum pending pubsub notifications per subscriber before it is dropped. + #[arg(long, default_value = "512")] + pub pubsub_max_pending_notifications: usize, } pub struct FrontierPartialComponents { diff --git a/template/node/src/service.rs b/template/node/src/service.rs index 6e6af378df..baee221810 100644 --- a/template/node/src/service.rs +++ b/template/node/src/service.rs @@ -407,6 +407,9 @@ where // Everytime a new subscription is created, a new mpsc channel is added to the sink pool. // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel. // This way we avoid race conditions when using native substrate block import notification stream. + fc_mapping_sync::set_max_pending_notifications_per_subscriber( + eth_config.pubsub_max_pending_notifications, + ); let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks< fc_mapping_sync::EthereumBlockNotification, > = Default::default();