diff --git a/Cargo.lock b/Cargo.lock index 1d461214233..aa5ce0d226d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7429,6 +7429,7 @@ dependencies = [ "reth-stages", "reth-static-file", "tokio", + "tokio-stream", "tracing", ] @@ -7763,6 +7764,7 @@ dependencies = [ "reth-rpc-types", "reth-rpc-types-compat", "reth-tasks", + "reth-tokio-util", "reth-tracing", "reth-transaction-pool", "serde", @@ -7797,6 +7799,7 @@ dependencies = [ "reth-rpc-types", "reth-rpc-types-compat", "reth-tasks", + "reth-tokio-util", "serde", "thiserror", "tokio", @@ -7937,6 +7940,7 @@ dependencies = [ "reth-stages", "reth-tokio-util", "tempfile", + "tokio", "tokio-stream", "tracing", ] @@ -7972,6 +7976,7 @@ version = "0.2.0-beta.7" dependencies = [ "tokio", "tokio-stream", + "tracing", ] [[package]] diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 8fecc928ab0..34b52dd5760 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -11,7 +11,7 @@ use crate::{ utils::get_single_header, }; use clap::Parser; -use futures::{stream::select as stream_select, StreamExt}; +use futures::stream::select as stream_select; use reth_beacon_consensus::EthBeaconConsensus; use reth_cli_runner::CliContext; use reth_config::{config::EtlConfig, Config}; @@ -259,8 +259,8 @@ impl Command { let pipeline_events = pipeline.events(); let events = stream_select( - network.event_listener().map(Into::into), - pipeline_events.map(Into::into), + reth_node_events::node::handle_broadcast_stream(network.event_listener()), + reth_node_events::node::handle_broadcast_stream(pipeline_events), ); ctx.task_executor.spawn_critical( "events task", diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index 7d6b12fd8f0..5561c38b71c 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -11,7 +11,7 @@ use crate::{ }; use clap::Parser; use eyre::Context; -use futures::{Stream, StreamExt}; +use futures::Stream; use reth_beacon_consensus::EthBeaconConsensus; use reth_config::{config::EtlConfig, Config}; use reth_consensus::Consensus; @@ -257,7 +257,7 @@ where let max_block = file_client.max_block().unwrap_or(0); - let mut pipeline = Pipeline::builder() + let pipeline = Pipeline::builder() .with_tip_sender(tip_tx) // we want to sync all blocks the file client provides or 0 if empty .with_max_block(max_block) @@ -277,7 +277,7 @@ where ) .build(provider_factory, static_file_producer); - let events = pipeline.events().map(Into::into); + let events = reth_node_events::node::handle_broadcast_stream(pipeline.events()); Ok((pipeline, events)) } diff --git a/crates/consensus/auto-seal/src/lib.rs b/crates/consensus/auto-seal/src/lib.rs index e954108c8c4..67a3f6e5ac7 100644 --- a/crates/consensus/auto-seal/src/lib.rs +++ b/crates/consensus/auto-seal/src/lib.rs @@ -36,7 +36,7 @@ use std::{ sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; -use tokio::sync::{mpsc::UnboundedSender, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tokio::sync::{mpsc::Sender, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::trace; mod client; @@ -97,7 +97,7 @@ pub struct AutoSealBuilder { pool: Pool, mode: MiningMode, storage: Storage, - to_engine: UnboundedSender>, + to_engine: Sender>, canon_state_notification: CanonStateNotificationSender, evm_config: EvmConfig, } @@ -115,7 +115,7 @@ where chain_spec: Arc, client: Client, pool: Pool, - to_engine: UnboundedSender>, + to_engine: Sender>, canon_state_notification: CanonStateNotificationSender, mode: MiningMode, evm_config: EvmConfig, diff --git a/crates/consensus/auto-seal/src/task.rs b/crates/consensus/auto-seal/src/task.rs index 42f1268f331..e3d83f08926 100644 --- a/crates/consensus/auto-seal/src/task.rs +++ b/crates/consensus/auto-seal/src/task.rs @@ -17,8 +17,8 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::sync::{mpsc::UnboundedSender, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio::sync::{mpsc::Sender, oneshot}; +use tokio_stream::wrappers::BroadcastStream; use tracing::{debug, error, warn}; /// A Future that listens for new ready transactions and puts new blocks into storage @@ -30,7 +30,7 @@ pub struct MiningTask>>>, + insert_task: Option>>>, /// Shared storage to insert new blocks storage: Storage, /// Pool where transactions are stored @@ -38,11 +38,11 @@ pub struct MiningTask::Transaction>>>>, // TODO: ideally this would just be a sender of hashes - to_engine: UnboundedSender>, + to_engine: Sender>, /// Used to notify consumers of new blocks canon_state_notification: CanonStateNotificationSender, /// The pipeline events to listen on - pipe_line_events: Option>, + pipe_line_events: Option>, /// The type used for block execution block_executor: Executor, } @@ -57,7 +57,7 @@ impl pub(crate) fn new( chain_spec: Arc, miner: MiningMode, - to_engine: UnboundedSender>, + to_engine: Sender>, canon_state_notification: CanonStateNotificationSender, storage: Storage, client: Client, @@ -80,7 +80,7 @@ impl } /// Sets the pipeline events to listen on. - pub fn set_pipeline_events(&mut self, events: UnboundedReceiverStream) { + pub fn set_pipeline_events(&mut self, events: BroadcastStream) { self.pipe_line_events = Some(events); } } @@ -166,11 +166,16 @@ where // send the new update to the engine, this will trigger the engine // to download and execute the block we just inserted let (tx, rx) = oneshot::channel(); - let _ = to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs: None, - tx, - }); + if let Err(err) = to_engine + .send(BeaconEngineMessage::ForkchoiceUpdated { + state, + payload_attrs: None, + tx, + }) + .await + { + warn!("sending to consensus bounded channel: {err}"); + } debug!(target: "consensus::auto", ?state, "Sent fork choice update"); match rx.await.unwrap() { diff --git a/crates/consensus/beacon/src/engine/handle.rs b/crates/consensus/beacon/src/engine/handle.rs index 121a8fac070..14409f14ac6 100644 --- a/crates/consensus/beacon/src/engine/handle.rs +++ b/crates/consensus/beacon/src/engine/handle.rs @@ -10,8 +10,10 @@ use reth_interfaces::RethResult; use reth_rpc_types::engine::{ CancunPayloadFields, ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus, }; -use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use reth_tokio_util::EventListeners; +use tokio::sync::{mpsc::Sender, oneshot}; +use tokio_stream::wrappers::BroadcastStream; +use tracing::warn; /// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus /// engine task. @@ -22,7 +24,8 @@ pub struct BeaconConsensusEngineHandle where Engine: EngineTypes, { - pub(crate) to_engine: UnboundedSender>, + pub(crate) to_engine: Sender>, + event_listeners: EventListeners, } impl Clone for BeaconConsensusEngineHandle @@ -30,7 +33,7 @@ where Engine: EngineTypes, { fn clone(&self) -> Self { - Self { to_engine: self.to_engine.clone() } + Self { to_engine: self.to_engine.clone(), event_listeners: self.event_listeners.clone() } } } @@ -41,8 +44,11 @@ where Engine: EngineTypes, { /// Creates a new beacon consensus engine handle. - pub fn new(to_engine: UnboundedSender>) -> Self { - Self { to_engine } + pub fn new( + to_engine: Sender>, + event_listeners: EventListeners, + ) -> Self { + Self { to_engine, event_listeners } } /// Sends a new payload message to the beacon consensus engine and waits for a response. @@ -54,7 +60,13 @@ where cancun_fields: Option, ) -> Result { let (tx, rx) = oneshot::channel(); - let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }); + if let Err(err) = self + .to_engine + .send(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) + .await + { + warn!("sending to consensus bounded channel: {err}"); + } rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)? } @@ -68,6 +80,7 @@ where ) -> Result { Ok(self .send_fork_choice_updated(state, payload_attrs) + .await .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable) .await?? .await?) @@ -75,17 +88,19 @@ where /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to /// wait for a response. - fn send_fork_choice_updated( + async fn send_fork_choice_updated( &self, state: ForkchoiceState, payload_attrs: Option, ) -> oneshot::Receiver> { let (tx, rx) = oneshot::channel(); - let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated { - state, - payload_attrs, - tx, - }); + if let Err(err) = self + .to_engine + .send(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) + .await + { + warn!("sending to consensus bounded channel: {err}"); + } rx } @@ -93,13 +108,15 @@ where /// /// See also pub async fn transition_configuration_exchanged(&self) { - let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged); + if let Err(err) = + self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged).await + { + warn!("sending to consensus bounded channel: {err}"); + } } /// Creates a new [`BeaconConsensusEngineEvent`] listener stream. - pub fn event_listener(&self) -> UnboundedReceiverStream { - let (tx, rx) = mpsc::unbounded_channel(); - let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx)); - UnboundedReceiverStream::new(rx) + pub fn event_listener(&self) -> BroadcastStream { + self.event_listeners.new_listener() } } diff --git a/crates/consensus/beacon/src/engine/hooks/static_file.rs b/crates/consensus/beacon/src/engine/hooks/static_file.rs index 2cff68e1d26..01b7056c37f 100644 --- a/crates/consensus/beacon/src/engine/hooks/static_file.rs +++ b/crates/consensus/beacon/src/engine/hooks/static_file.rs @@ -91,8 +91,7 @@ impl StaticFileHook { return Ok(None) }; - let Some(mut locked_static_file_producer) = static_file_producer.try_lock_arc() - else { + let Some(locked_static_file_producer) = static_file_producer.try_lock_arc() else { trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken"); return Ok(None) }; diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index f9f1a84d46f..108dab41eb0 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -1,7 +1,4 @@ -use crate::{ - engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus}, - BeaconConsensusEngineEvent, -}; +use crate::engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus}; use futures::{future::Either, FutureExt}; use reth_engine_primitives::EngineTypes; use reth_interfaces::RethResult; @@ -15,7 +12,7 @@ use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tokio::sync::{mpsc::UnboundedSender, oneshot}; +use tokio::sync::oneshot; /// Represents the outcome of forkchoice update. /// @@ -162,6 +159,4 @@ pub enum BeaconEngineMessage { }, /// Message with exchanged transition configuration. TransitionConfigurationExchanged, - /// Add a new listener for [`BeaconEngineMessage`]. - EventListener(UnboundedSender), } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 1057457c779..53a6b6a4d73 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -37,10 +37,10 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::{ - mpsc::{self, UnboundedSender}, + mpsc::{self, Sender}, oneshot, }; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::ReceiverStream; use tracing::*; mod message; @@ -89,6 +89,9 @@ const MAX_INVALID_HEADERS: u32 = 512u32; /// If the distance exceeds this threshold, the pipeline will be used for sync. pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS; +/// The size of the channel used to receive consensus events sent from the RPC. +pub const DEFAULT_CONSENSUS_CHANNEL_SIZE: usize = 1000; + /// The beacon consensus engine is the driver that switches between historical and live sync. /// /// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are @@ -236,7 +239,7 @@ where pipeline_run_threshold: u64, hooks: EngineHooks, ) -> RethResult<(Self, BeaconConsensusEngineHandle)> { - let (to_engine, rx) = mpsc::unbounded_channel(); + let (to_engine, rx) = mpsc::channel(DEFAULT_CONSENSUS_CHANNEL_SIZE); Self::with_channel( client, pipeline, @@ -249,7 +252,7 @@ where target, pipeline_run_threshold, to_engine, - Box::pin(UnboundedReceiverStream::from(rx)), + Box::pin(ReceiverStream::from(rx)), hooks, ) } @@ -278,12 +281,12 @@ where payload_builder: PayloadBuilderHandle, target: Option, pipeline_run_threshold: u64, - to_engine: UnboundedSender>, + to_engine: Sender>, engine_message_stream: BoxStream<'static, BeaconEngineMessage>, hooks: EngineHooks, ) -> RethResult<(Self, BeaconConsensusEngineHandle)> { - let handle = BeaconConsensusEngineHandle { to_engine }; let listeners = EventListeners::default(); + let handle = BeaconConsensusEngineHandle::new(to_engine, listeners.clone()); let sync = EngineSyncController::new( pipeline, client, @@ -597,13 +600,6 @@ where self.handle.clone() } - /// Pushes an [UnboundedSender] to the engine's listeners. Also pushes an [UnboundedSender] to - /// the sync controller's listeners. - pub(crate) fn push_listener(&mut self, listener: UnboundedSender) { - self.listeners.push_listener(listener.clone()); - self.sync.push_listener(listener); - } - /// Returns true if the distance from the local tip to the block is greater than the configured /// threshold. /// @@ -1878,7 +1874,6 @@ where BeaconEngineMessage::TransitionConfigurationExchanged => { this.blockchain.on_transition_configuration_exchanged(); } - BeaconEngineMessage::EventListener(tx) => this.push_listener(tx), } continue } diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index 09c6d208b6e..423cbef6b0e 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -21,7 +21,7 @@ use std::{ sync::Arc, task::{ready, Context, Poll}, }; -use tokio::sync::{mpsc::UnboundedSender, oneshot}; +use tokio::sync::oneshot; use tracing::trace; /// Manages syncing under the control of the engine. @@ -127,11 +127,6 @@ where self.run_pipeline_continuously } - /// Pushes an [UnboundedSender] to the sync controller's listeners. - pub(crate) fn push_listener(&mut self, listener: UnboundedSender) { - self.listeners.push_listener(listener); - } - /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`. #[allow(dead_code)] pub(crate) fn is_pipeline_sync_pending(&self) -> bool { diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index 92e9b316a9a..341057a0599 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -2,11 +2,11 @@ use futures_util::StreamExt; use reth::network::{NetworkEvent, NetworkEvents, NetworkHandle, PeersInfo}; use reth_primitives::NodeRecord; use reth_tracing::tracing::info; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::BroadcastStream; /// Helper for network operations pub struct NetworkTestContext { - network_events: UnboundedReceiverStream, + network_events: BroadcastStream, network: NetworkHandle, } @@ -22,7 +22,7 @@ impl NetworkTestContext { self.network.peers_handle().add_peer(node_record.id, node_record.tcp_addr()); match self.network_events.next().await { - Some(NetworkEvent::PeerAdded(_)) => (), + Some(Ok(NetworkEvent::PeerAdded(_))) => (), _ => panic!("Expected a peer added event"), } } @@ -35,7 +35,7 @@ impl NetworkTestContext { /// Expects a session to be established pub async fn expect_session(&mut self) { match self.network_events.next().await { - Some(NetworkEvent::SessionEstablished { remote_addr, .. }) => { + Some(Ok(NetworkEvent::SessionEstablished { remote_addr, .. })) => { info!(?remote_addr, "Session established") } _ => panic!("Expected session established event"), diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index d516625c640..8e20408130e 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -528,9 +528,7 @@ where /// Handler for received messages from a handle fn on_handle_message(&mut self, msg: NetworkHandleMessage) { match msg { - NetworkHandleMessage::EventListener(tx) => { - self.event_listeners.push_listener(tx); - } + NetworkHandleMessage::EventListener(tx) => self.event_listeners.set_sender(tx), NetworkHandleMessage::DiscoveryListener(tx) => { self.swarm.state_mut().discovery_mut().add_listener(tx); } diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 86669bf19f4..c490838aeb7 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -16,6 +16,7 @@ use reth_network_api::{ use reth_network_types::PeerId; use reth_primitives::{Head, NodeRecord, TransactionSigned, B256}; use reth_rpc_types::NetworkStatus; +use reth_tokio_util::EventListeners; use secp256k1::SecretKey; use std::{ net::SocketAddr, @@ -24,8 +25,12 @@ use std::{ Arc, }, }; -use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio::sync::{ + broadcast::Sender, + mpsc::{self, UnboundedSender}, + oneshot, +}; +use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; /// A _shareable_ network frontend. Used to interact with the network. /// @@ -68,7 +73,11 @@ impl NetworkHandle { chain_id, tx_gossip_disabled, discv4, + event_listeners: Default::default(), }; + let tx = inner.event_listeners.clone_sender(); + let _ = inner.to_manager_tx.send(NetworkHandleMessage::EventListener(tx)); + Self { inner: Arc::new(inner) } } @@ -196,10 +205,8 @@ impl NetworkHandle { // === API Implementations === impl NetworkEvents for NetworkHandle { - fn event_listener(&self) -> UnboundedReceiverStream { - let (tx, rx) = mpsc::unbounded_channel(); - let _ = self.manager().send(NetworkHandleMessage::EventListener(tx)); - UnboundedReceiverStream::new(rx) + fn event_listener(&self) -> BroadcastStream { + self.inner.event_listeners.new_listener() } fn discovery_listener(&self) -> UnboundedReceiverStream { @@ -401,12 +408,14 @@ struct NetworkInner { tx_gossip_disabled: bool, /// The instance of the discv4 service discv4: Option, + /// All listeners for high level network events. + event_listeners: EventListeners, } /// Provides event subscription for the network. pub trait NetworkEvents: Send + Sync { /// Creates a new [`NetworkEvent`] listener channel. - fn event_listener(&self) -> UnboundedReceiverStream; + fn event_listener(&self) -> BroadcastStream; /// Returns a new [`DiscoveryEvent`] stream. /// /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. @@ -431,7 +440,7 @@ pub(crate) enum NetworkHandleMessage { /// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason. DisconnectPeer(PeerId, Option), /// Adds a new listener for `NetworkEvent`. - EventListener(UnboundedSender), + EventListener(Sender), /// Broadcasts an event to announce a new block to all nodes. AnnounceBlock(NewBlock, B256), /// Sends a list of transactions to the given peer. diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index a92934c0cbc..7c464798aab 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -40,7 +40,7 @@ use tokio::{ }, task::JoinHandle, }; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::BroadcastStream; /// A test network consisting of multiple peers. pub struct Testnet { @@ -503,7 +503,7 @@ impl PeerHandle { } /// Creates a new [`NetworkEvent`] listener channel. - pub fn event_listener(&self) -> UnboundedReceiverStream { + pub fn event_listener(&self) -> BroadcastStream { self.network.event_listener() } @@ -591,14 +591,14 @@ impl Default for PeerConfig { /// This makes it easier to await established connections #[derive(Debug)] pub struct NetworkEventStream { - inner: UnboundedReceiverStream, + inner: BroadcastStream, } // === impl NetworkEventStream === impl NetworkEventStream { /// Create a new [`NetworkEventStream`] from the given network event receiver stream. - pub fn new(inner: UnboundedReceiverStream) -> Self { + pub fn new(inner: BroadcastStream) -> Self { Self { inner } } @@ -606,7 +606,9 @@ impl NetworkEventStream { pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option)> { while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)), + Ok(NetworkEvent::SessionClosed { peer_id, reason }) => { + return Some((peer_id, reason)) + } _ => continue, } } @@ -617,7 +619,7 @@ impl NetworkEventStream { pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id), + Ok(NetworkEvent::SessionEstablished { peer_id, .. }) => return Some(peer_id), _ => continue, } } @@ -632,7 +634,7 @@ impl NetworkEventStream { let mut peers = Vec::with_capacity(num); while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => { + Ok(NetworkEvent::SessionEstablished { peer_id, .. }) => { peers.push(peer_id); num -= 1; if num == 0 { @@ -650,12 +652,12 @@ impl NetworkEventStream { /// session. pub async fn peer_added_and_established(&mut self) -> Option { let peer_id = match self.inner.next().await { - Some(NetworkEvent::PeerAdded(peer_id)) => peer_id, + Some(Ok(NetworkEvent::PeerAdded(peer_id))) => peer_id, _ => return None, }; match self.inner.next().await { - Some(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. }) => { + Some(Ok(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. })) => { debug_assert_eq!(peer_id, peer_id2, "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"); Some(peer_id) } diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index ee14e4c82f0..9b091e43e8a 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -47,8 +47,10 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; -use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; -use tracing::{debug, trace}; +use tokio_stream::wrappers::{ + errors::BroadcastStreamRecvError, BroadcastStream, ReceiverStream, UnboundedReceiverStream, +}; +use tracing::{debug, error, trace}; /// Aggregation on configurable parameters for [`TransactionsManager`]. pub mod config; @@ -198,7 +200,7 @@ pub struct TransactionsManager { /// Subscriptions to all network related events. /// /// From which we get all new incoming transaction related messages. - network_events: UnboundedReceiverStream, + network_events: BroadcastStream, /// Transaction fetcher to handle inflight and missing transaction requests. transaction_fetcher: TransactionFetcher, /// All currently pending transactions grouped by peers. @@ -883,15 +885,19 @@ where } /// Handles a received event related to common network events. - fn on_network_event(&mut self, event: NetworkEvent) { - match event { - NetworkEvent::SessionClosed { peer_id, .. } => { + fn on_network_event(&mut self, event_result: Result) { + match event_result { + Ok(NetworkEvent::SessionClosed { peer_id, .. }) => { // remove the peer self.peers.remove(&peer_id); } - NetworkEvent::SessionEstablished { - peer_id, client_version, messages, version, .. - } => { + Ok(NetworkEvent::SessionEstablished { + peer_id, + client_version, + messages, + version, + .. + }) => { // Insert a new peer into the peerset. let peer = PeerMetadata::new(messages, version, client_version); let peer = match self.peers.entry(peer_id) { @@ -926,6 +932,7 @@ where let msg = msg_builder.build(); self.network.send_transactions_hashes(peer_id, msg); } + Err(e) => error!("{e}"), _ => {} } } @@ -1718,7 +1725,7 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { + Ok(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -1726,9 +1733,9 @@ mod tests { messages, status, version, - } => { + }) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { + transactions.on_network_event(Ok(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -1736,12 +1743,13 @@ mod tests { messages, status, version, - }) + })) } - NetworkEvent::PeerAdded(_peer_id) => continue, - ev => { + Ok(NetworkEvent::PeerAdded(_peer_id)) => continue, + Ok(ev) => { panic!("unexpected event {ev:?}") } + Err(err) => panic!("unexpected error {err:?}"), } } // random tx: @@ -1804,7 +1812,7 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { + Ok(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -1812,9 +1820,9 @@ mod tests { messages, status, version, - } => { + }) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { + transactions.on_network_event(Ok(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -1822,12 +1830,13 @@ mod tests { messages, status, version, - }) + })) } - NetworkEvent::PeerAdded(_peer_id) => continue, - ev => { + Ok(NetworkEvent::PeerAdded(_peer_id)) => continue, + Ok(ev) => { panic!("unexpected event {ev:?}") } + Err(err) => panic!("unexpected error {err:?}"), } } // random tx: @@ -1888,7 +1897,7 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { + Ok(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -1896,9 +1905,9 @@ mod tests { messages, status, version, - } => { + }) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { + transactions.on_network_event(Ok(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -1906,12 +1915,13 @@ mod tests { messages, status, version, - }) + })) } - NetworkEvent::PeerAdded(_peer_id) => continue, - ev => { + Ok(NetworkEvent::PeerAdded(_peer_id)) => continue, + Ok(ev) => { panic!("unexpected event {ev:?}") } + Err(err) => panic!("unexpected error {err:?}"), } } // random tx: @@ -1979,7 +1989,7 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { + Ok(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -1987,7 +1997,7 @@ mod tests { messages, status, version, - } => transactions.on_network_event(NetworkEvent::SessionEstablished { + }) => transactions.on_network_event(Ok(NetworkEvent::SessionEstablished { peer_id, remote_addr, client_version, @@ -1995,11 +2005,12 @@ mod tests { messages, status, version, - }), - NetworkEvent::PeerAdded(_peer_id) => continue, - ev => { + })), + Ok(NetworkEvent::PeerAdded(_peer_id)) => continue, + Ok(ev) => { panic!("unexpected event {ev:?}") } + Err(err) => panic!("unexpected error {err:?}"), } } handle.terminate().await; diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 7b9c785ebdb..29e81cbc5d2 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -54,18 +54,19 @@ async fn test_establish_connections() { let mut established = listener0.take(4); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionClosed { .. } => { + Ok(NetworkEvent::SessionClosed { .. }) => { panic!("unexpected event") } - NetworkEvent::SessionEstablished { peer_id, .. } => { + Ok(NetworkEvent::SessionEstablished { peer_id, .. }) => { assert!(expected_connections.remove(&peer_id)) } - NetworkEvent::PeerAdded(peer_id) => { + Ok(NetworkEvent::PeerAdded(peer_id)) => { assert!(expected_peers.remove(&peer_id)) } - NetworkEvent::PeerRemoved(_) => { + Ok(NetworkEvent::PeerRemoved(_)) => { panic!("unexpected event") } + Err(e) => panic!("error: {e}"), } } assert!(expected_connections.is_empty()); @@ -210,8 +211,13 @@ async fn test_connect_with_boot_nodes() { let mut events = handle.event_listener(); tokio::task::spawn(network); - while let Some(ev) = events.next().await { - dbg!(ev); + while let Some(result_ev) = events.next().await { + match result_ev { + Ok(ev) => { + dbg!(ev); + } + Err(err) => eprintln!("{err}"), + } } } @@ -246,8 +252,13 @@ async fn test_connect_with_builder() { } }); - while let Some(ev) = events.next().await { - dbg!(ev); + while let Some(result_ev) = events.next().await { + match result_ev { + Ok(ev) => { + dbg!(ev); + } + Err(err) => eprintln!("{err}"), + } } } @@ -302,8 +313,13 @@ async fn test_connect_to_trusted_peer() { dbg!(&headers); - while let Some(ev) = events.next().await { - dbg!(ev); + while let Some(result_ev) = events.next().await { + match result_ev { + Ok(ev) => { + dbg!(ev); + } + Err(err) => eprintln!("{err}"), + } } } @@ -493,11 +509,11 @@ async fn test_geth_disconnect() { handle.add_peer(geth_peer_id, geth_socket); match events.next().await { - Some(NetworkEvent::PeerAdded(peer_id)) => assert_eq!(peer_id, geth_peer_id), + Some(Ok(NetworkEvent::PeerAdded(peer_id))) => assert_eq!(peer_id, geth_peer_id), _ => panic!("Expected a peer added event"), } - if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await { + if let Some(Ok(NetworkEvent::SessionEstablished { peer_id, .. })) = events.next().await { assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session established event"); @@ -507,7 +523,7 @@ async fn test_geth_disconnect() { handle.disconnect_peer(geth_peer_id); // wait for a disconnect from geth - if let Some(NetworkEvent::SessionClosed { peer_id, .. }) = events.next().await { + if let Some(Ok(NetworkEvent::SessionClosed { peer_id, .. })) = events.next().await { assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session closed event"); diff --git a/crates/net/network/tests/it/session.rs b/crates/net/network/tests/it/session.rs index 0442cd4319b..fbe424c7db0 100644 --- a/crates/net/network/tests/it/session.rs +++ b/crates/net/network/tests/it/session.rs @@ -28,10 +28,10 @@ async fn test_session_established_with_highest_version() { while let Some(event) = events.next().await { match event { - NetworkEvent::PeerAdded(peer_id) => { + Ok(NetworkEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::SessionEstablished { peer_id, status, .. } => { + Ok(NetworkEvent::SessionEstablished { peer_id, status, .. }) => { assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth68 as u8); } @@ -66,10 +66,10 @@ async fn test_session_established_with_different_capability() { while let Some(event) = events.next().await { match event { - NetworkEvent::PeerAdded(peer_id) => { + Ok(NetworkEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::SessionEstablished { peer_id, status, .. } => { + Ok(NetworkEvent::SessionEstablished { peer_id, status, .. }) => { assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth66 as u8); } diff --git a/crates/net/network/tests/it/txgossip.rs b/crates/net/network/tests/it/txgossip.rs index 68a7775f0e6..a37925d5845 100644 --- a/crates/net/network/tests/it/txgossip.rs +++ b/crates/net/network/tests/it/txgossip.rs @@ -136,18 +136,19 @@ async fn test_sending_invalid_transactions() { // await disconnect for bad tx spam if let Some(ev) = peer1_events.next().await { match ev { - NetworkEvent::SessionClosed { peer_id, .. } => { + Ok(NetworkEvent::SessionClosed { peer_id, .. }) => { assert_eq!(peer_id, *peer0.peer_id()); } - NetworkEvent::SessionEstablished { .. } => { + Ok(NetworkEvent::SessionEstablished { .. }) => { panic!("unexpected SessionEstablished event") } - NetworkEvent::PeerAdded(_) => { + Ok(NetworkEvent::PeerAdded(_)) => { panic!("unexpected PeerAdded event") } - NetworkEvent::PeerRemoved(_) => { + Ok(NetworkEvent::PeerRemoved(_)) => { panic!("unexpected PeerRemoved event") } + Err(e) => panic!("unexpected error: {e}"), } } diff --git a/crates/node-core/src/engine/engine_store.rs b/crates/node-core/src/engine/engine_store.rs index 524e2c89bc2..0937a00f5b7 100644 --- a/crates/node-core/src/engine/engine_store.rs +++ b/crates/node-core/src/engine/engine_store.rs @@ -89,8 +89,7 @@ impl EngineMessageStore { )?; } // noop - BeaconEngineMessage::TransitionConfigurationExchanged | - BeaconEngineMessage::EventListener(_) => (), + BeaconEngineMessage::TransitionConfigurationExchanged => (), }; Ok(()) } diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index ece149e31dc..902d7f957a6 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -4,6 +4,7 @@ use crate::{ builder::{NodeAdapter, NodeAddOns, NodeTypesAdapter}, components::{NodeComponents, NodeComponentsBuilder}, hooks::NodeHooks, + launch::node::handle_broadcast_stream, node::FullNode, BuilderContext, NodeBuilderWithComponents, NodeHandle, }; @@ -11,7 +12,7 @@ use futures::{future, future::Either, stream, stream_select, StreamExt}; use reth_auto_seal_consensus::AutoSealConsensus; use reth_beacon_consensus::{ hooks::{EngineHooks, PruneHook, StaticFileHook}, - BeaconConsensusEngine, EthBeaconConsensus, + BeaconConsensusEngine, EthBeaconConsensus, DEFAULT_CONSENSUS_CHANNEL_SIZE, }; use reth_blockchain_tree::{ noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, @@ -34,8 +35,11 @@ use reth_tasks::TaskExecutor; use reth_tracing::tracing::{debug, info}; use reth_transaction_pool::TransactionPool; use std::{future::Future, sync::Arc}; -use tokio::sync::{mpsc::unbounded_channel, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio::sync::{ + mpsc::{channel, unbounded_channel}, + oneshot, +}; +use tokio_stream::wrappers::ReceiverStream; pub mod common; pub use common::LaunchContext; @@ -257,10 +261,10 @@ where // create pipeline let network_client = node_adapter.network().fetch_client().await?; - let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + let (consensus_engine_tx, consensus_engine_rx) = channel(DEFAULT_CONSENSUS_CHANNEL_SIZE); let node_config = ctx.node_config(); - let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx) + let consensus_engine_stream = ReceiverStream::from(consensus_engine_rx) .maybe_skip_fcu(node_config.debug.skip_fcu) .maybe_skip_new_payload(node_config.debug.skip_new_payload) // Store messages _after_ skipping so that `replay-engine` command @@ -282,7 +286,7 @@ where // Configure the pipeline let pipeline_exex_handle = exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty); - let (mut pipeline, client) = if ctx.is_dev() { + let (pipeline, client) = if ctx.is_dev() { info!(target: "reth::cli", "Starting Reth in dev mode"); for (idx, (address, alloc)) in ctx.chain_spec().genesis.alloc.iter().enumerate() { @@ -305,7 +309,7 @@ where ) .build(); - let mut pipeline = crate::setup::build_networked_pipeline( + let pipeline = crate::setup::build_networked_pipeline( ctx.node_config(), &ctx.toml_config().stages, client.clone(), @@ -358,7 +362,7 @@ where pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); } - let mut pruner = pruner_builder.build(ctx.provider_factory().clone()); + let pruner = pruner_builder.build(ctx.provider_factory().clone()); let pruner_events = pruner.events(); info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); @@ -383,19 +387,18 @@ where info!(target: "reth::cli", "Consensus engine initialized"); let events = stream_select!( - node_adapter.components.network().event_listener().map(Into::into), - beacon_engine_handle.event_listener().map(Into::into), - pipeline_events.map(Into::into), + handle_broadcast_stream(node_adapter.components.network().event_listener()), + handle_broadcast_stream(beacon_engine_handle.event_listener()), + handle_broadcast_stream(pipeline_events), if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { - Either::Left( - ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())) - .map(Into::into), - ) + Either::Left(handle_broadcast_stream( + ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())).map(Ok), + )) } else { Either::Right(stream::empty()) }, - pruner_events.map(Into::into), - static_file_producer_events.map(Into::into) + handle_broadcast_stream(pruner_events), + handle_broadcast_stream(static_file_producer_events) ); ctx.task_executor().spawn_critical( "events task", diff --git a/crates/node/events/Cargo.toml b/crates/node/events/Cargo.toml index 83f2bd13aec..7faf4f7fa84 100644 --- a/crates/node/events/Cargo.toml +++ b/crates/node/events/Cargo.toml @@ -24,6 +24,7 @@ reth-rpc-types.workspace = true # async tokio.workspace = true +tokio-stream.workspace = true # async futures.workspace = true diff --git a/crates/node/events/src/node.rs b/crates/node/events/src/node.rs index ba7ae8da460..b17ae12a6fc 100644 --- a/crates/node/events/src/node.rs +++ b/crates/node/events/src/node.rs @@ -1,7 +1,7 @@ //! Support for handling events emitted by node components. use crate::cl::ConsensusLayerHealthEvent; -use futures::Stream; +use futures::{Stream, StreamExt}; use reth_beacon_consensus::{ BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress, ForkchoiceStatus, }; @@ -25,6 +25,7 @@ use std::{ time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use tokio::time::Interval; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{debug, info, warn}; /// Interval of reporting node state. @@ -392,6 +393,9 @@ pub enum NodeEvent { Pruner(PrunerEvent), /// A static_file_producer event StaticFileProducer(StaticFileProducerEvent), + /// Used to encapsulate various conditions or situations that do not + /// naturally fit into the other more specific variants. + Other(String), } impl From for NodeEvent { @@ -575,6 +579,9 @@ where NodeEvent::StaticFileProducer(event) => { this.state.handle_static_file_producer_event(event); } + NodeEvent::Other(event_description) => { + warn!("{event_description}"); + } } } @@ -664,6 +671,21 @@ impl Display for Eta { } } +/// Transforms a stream of `Result` into a stream of `NodeEvent`, +/// applying a uniform error handling and conversion strategy. +pub fn handle_broadcast_stream( + stream: impl Stream> + Unpin, +) -> impl Stream + Unpin +where + T: Into, +{ + stream.map(|result_event| { + result_event + .map(Into::into) + .unwrap_or_else(|err| NodeEvent::Other(format!("Stream error: {:?}", err))) + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 55a998709d8..43703c46332 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -19,7 +19,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::watch; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::BroadcastStream; use tracing::debug; /// Result of [Pruner::run] execution. @@ -82,7 +82,7 @@ impl Pruner { } /// Listen for events on the pruner. - pub fn events(&mut self) -> UnboundedReceiverStream { + pub fn events(&self) -> BroadcastStream { self.listeners.new_listener() } diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 9087ff7c7ff..e3b5f4766d9 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -56,6 +56,7 @@ reth-rpc-types.workspace = true reth-rpc-types-compat.workspace = true reth-tracing.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } +reth-tokio-util.workspace = true tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } serde_json.workspace = true diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index a3272ac026a..c34c7c255af 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -1,4 +1,6 @@ -use reth_beacon_consensus::BeaconConsensusEngineHandle; +use reth_beacon_consensus::{ + BeaconConsensusEngineEvent, BeaconConsensusEngineHandle, DEFAULT_CONSENSUS_CHANNEL_SIZE, +}; use reth_ethereum_engine_primitives::EthEngineTypes; use reth_evm_ethereum::EthEvmConfig; use reth_network_api::noop::NoopNetwork; @@ -13,9 +15,10 @@ use reth_rpc_builder::{ use reth_rpc_engine_api::EngineApi; use reth_rpc_layer::JwtSecret; use reth_tasks::TokioTaskExecutor; +use reth_tokio_util::EventListeners; use reth_transaction_pool::test_utils::{TestPool, TestPoolBuilder}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::channel; /// Localhost with port 0 so a free port is used. pub fn test_address() -> SocketAddr { @@ -25,8 +28,10 @@ pub fn test_address() -> SocketAddr { /// Launches a new server for the auth module pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle { let config = AuthServerConfig::builder(secret).socket_addr(test_address()).build(); - let (tx, _rx) = unbounded_channel(); - let beacon_engine_handle = BeaconConsensusEngineHandle::::new(tx); + let (tx, _rx) = channel(DEFAULT_CONSENSUS_CHANNEL_SIZE); + let listeners: EventListeners = Default::default(); + let beacon_engine_handle = BeaconConsensusEngineHandle::::new(tx, listeners); + let engine_api = EngineApi::new( NoopProvider::default(), MAINNET.clone(), diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index 5fe782a6ef5..83a5f85fcfa 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -43,6 +43,7 @@ reth-ethereum-engine-primitives.workspace = true reth-interfaces = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } reth-payload-builder = { workspace = true, features = ["test-utils"] } +reth-tokio-util.workspace = true alloy-rlp.workspace = true diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 0e4476bb71b..2ceaf5cf89c 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -770,7 +770,9 @@ where mod tests { use super::*; use assert_matches::assert_matches; - use reth_beacon_consensus::BeaconEngineMessage; + use reth_beacon_consensus::{ + BeaconConsensusEngineEvent, BeaconEngineMessage, DEFAULT_CONSENSUS_CHANNEL_SIZE, + }; use reth_ethereum_engine_primitives::EthEngineTypes; use reth_interfaces::test_utils::generators::random_block; use reth_payload_builder::test_utils::spawn_test_payload_service; @@ -778,19 +780,21 @@ mod tests { use reth_provider::test_utils::MockEthProvider; use reth_rpc_types_compat::engine::payload::execution_payload_from_sealed_block; use reth_tasks::TokioTaskExecutor; - use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; + use reth_tokio_util::EventListeners; + use tokio::sync::mpsc::{channel, Receiver}; fn setup_engine_api() -> (EngineApiTestHandle, EngineApi, EthEngineTypes>) { let chain_spec: Arc = MAINNET.clone(); let provider = Arc::new(MockEthProvider::default()); let payload_store = spawn_test_payload_service(); - let (to_engine, engine_rx) = unbounded_channel(); + let (to_engine, engine_rx) = channel(DEFAULT_CONSENSUS_CHANNEL_SIZE); + let listeners: EventListeners = Default::default(); let task_executor = Box::::default(); let api = EngineApi::new( provider.clone(), chain_spec.clone(), - BeaconConsensusEngineHandle::new(to_engine), + BeaconConsensusEngineHandle::new(to_engine, listeners), payload_store.into(), task_executor, ); @@ -801,7 +805,7 @@ mod tests { struct EngineApiTestHandle { chain_spec: Arc, provider: Arc, - from_api: UnboundedReceiver>, + from_api: Receiver>, } #[tokio::test] diff --git a/crates/stages-api/src/error.rs b/crates/stages-api/src/error.rs index 37fe2b3fdbc..f6e528ca754 100644 --- a/crates/stages-api/src/error.rs +++ b/crates/stages-api/src/error.rs @@ -1,3 +1,4 @@ +use crate::PipelineEvent; use reth_consensus::ConsensusError; use reth_interfaces::{ db::DatabaseError as DbError, executor, p2p::error::DownloadError, RethError, @@ -5,9 +6,7 @@ use reth_interfaces::{ use reth_primitives::{BlockNumber, SealedHeader, StaticFileSegment, TxNumber}; use reth_provider::ProviderError; use thiserror::Error; - -use crate::PipelineEvent; -use tokio::sync::mpsc::error::SendError; +use tokio::sync::broadcast::error::SendError; /// Represents the specific error type within a block error. #[derive(Error, Debug)] diff --git a/crates/stages-api/src/pipeline/mod.rs b/crates/stages-api/src/pipeline/mod.rs index 5aceb515b79..cf7e23c7e50 100644 --- a/crates/stages-api/src/pipeline/mod.rs +++ b/crates/stages-api/src/pipeline/mod.rs @@ -20,7 +20,7 @@ use reth_static_file::StaticFileProducer; use reth_tokio_util::EventListeners; use std::pin::Pin; use tokio::sync::watch; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::BroadcastStream; use tracing::*; mod builder; @@ -108,7 +108,7 @@ where } /// Listen for events on the pipeline. - pub fn events(&mut self) -> UnboundedReceiverStream { + pub fn events(&self) -> BroadcastStream { self.listeners.new_listener() } @@ -251,7 +251,7 @@ where /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the /// lock is occupied. pub fn move_to_static_files(&self) -> RethResult<()> { - let mut static_file_producer = self.static_file_producer.lock(); + let static_file_producer = self.static_file_producer.lock(); // Copies data from database to static files let lowest_static_file_height = { @@ -313,6 +313,7 @@ where "Unwind point too far for stage" ); self.listeners.notify(PipelineEvent::Skipped { stage_id }); + continue } @@ -360,6 +361,7 @@ where } Err(err) => { self.listeners.notify(PipelineEvent::Error { stage_id }); + return Err(PipelineError::Stage(StageError::Fatal(Box::new(err)))) } } @@ -417,6 +419,7 @@ where if let Err(err) = stage.execute_ready(exec_input).await { self.listeners.notify(PipelineEvent::Error { stage_id }); + match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? { Some(ctrl) => return Ok(ctrl), None => continue, @@ -472,6 +475,7 @@ where Err(err) => { drop(provider_rw); self.listeners.notify(PipelineEvent::Error { stage_id }); + if let Some(ctrl) = on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? { @@ -592,7 +596,7 @@ mod tests { }; use reth_primitives::PruneModes; use reth_provider::test_utils::create_test_provider_factory; - use tokio_stream::StreamExt; + use tokio_stream::{wrappers::errors::BroadcastStreamRecvError, StreamExt}; #[test] fn record_progress_calculates_outliers() { @@ -653,42 +657,42 @@ mod tests { // Check that the stages were run in order assert_eq!( - events.collect::>().await, + events.collect::>>().await, vec![ - PipelineEvent::Prepare { + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, - }, - PipelineEvent::Prepare { + }), + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, - }, + }), ] ); } @@ -736,97 +740,97 @@ mod tests { // Check that the stages were unwound in reverse order assert_eq!( - events.collect::>().await, + events.collect::>>().await, vec![ // Executing - PipelineEvent::Prepare { + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, stage_id: StageId::Other("A"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, stage_id: StageId::Other("A"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, - }, - PipelineEvent::Prepare { + }), + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, - }, - PipelineEvent::Prepare { + }), + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, stage_id: StageId::Other("C"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, stage_id: StageId::Other("C"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, stage_id: StageId::Other("C"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, - }, + }), // Unwinding - PipelineEvent::Unwind { + Ok(PipelineEvent::Unwind { stage_id: StageId::Other("C"), input: UnwindInput { checkpoint: StageCheckpoint::new(20), unwind_to: 1, bad_block: None } - }, - PipelineEvent::Unwound { + }), + Ok(PipelineEvent::Unwound { stage_id: StageId::Other("C"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - }, - PipelineEvent::Unwind { + }), + Ok(PipelineEvent::Unwind { stage_id: StageId::Other("B"), input: UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 1, bad_block: None } - }, - PipelineEvent::Unwound { + }), + Ok(PipelineEvent::Unwound { stage_id: StageId::Other("B"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - }, - PipelineEvent::Unwind { + }), + Ok(PipelineEvent::Unwind { stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(100), unwind_to: 1, bad_block: None } - }, - PipelineEvent::Unwound { + }), + Ok(PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, - }, + }), ] ); } @@ -868,58 +872,58 @@ mod tests { // Check that the stages were unwound in reverse order assert_eq!( - events.collect::>().await, + events.collect::>>().await, vec![ // Executing - PipelineEvent::Prepare { + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, - }, - PipelineEvent::Prepare { + }), + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, - }, + }), // Unwinding // Nothing to unwind in stage "B" - PipelineEvent::Skipped { stage_id: StageId::Other("B") }, - PipelineEvent::Unwind { + Ok(PipelineEvent::Skipped { stage_id: StageId::Other("B") }), + Ok(PipelineEvent::Unwind { stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(100), unwind_to: 50, bad_block: None } - }, - PipelineEvent::Unwound { + }), + Ok(PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(50) }, - }, + }), ] ); } @@ -978,84 +982,84 @@ mod tests { // Check that the stages were unwound in reverse order assert_eq!( - events.collect::>().await, + events.collect::>>().await, vec![ - PipelineEvent::Prepare { + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, - }, - PipelineEvent::Prepare { + }), + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Error { stage_id: StageId::Other("B") }, - PipelineEvent::Unwind { + }), + Ok(PipelineEvent::Error { stage_id: StageId::Other("B") }), + Ok(PipelineEvent::Unwind { stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 0, bad_block: Some(5) } - }, - PipelineEvent::Unwound { + }), + Ok(PipelineEvent::Unwound { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, - }, - PipelineEvent::Prepare { + }), + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: Some(StageCheckpoint::new(0)), target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), checkpoint: Some(StageCheckpoint::new(0)), target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, - }, - PipelineEvent::Prepare { + }), + Ok(PipelineEvent::Prepare { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Run { + }), + Ok(PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), checkpoint: None, target: Some(10), - }, - PipelineEvent::Ran { + }), + Ok(PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, - }, + }), ] ); } diff --git a/crates/static-file/Cargo.toml b/crates/static-file/Cargo.toml index 1345b2f232f..0f6608c8084 100644 --- a/crates/static-file/Cargo.toml +++ b/crates/static-file/Cargo.toml @@ -21,6 +21,7 @@ reth-nippy-jar.workspace = true reth-tokio-util.workspace = true # async +tokio.workspace = true tokio-stream.workspace = true # misc diff --git a/crates/static-file/src/static_file_producer.rs b/crates/static-file/src/static_file_producer.rs index c7a365c9afa..8c3fa7d0d55 100644 --- a/crates/static-file/src/static_file_producer.rs +++ b/crates/static-file/src/static_file_producer.rs @@ -16,7 +16,7 @@ use std::{ sync::Arc, time::Instant, }; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::BroadcastStream; use tracing::{debug, trace}; /// Result of [StaticFileProducerInner::run] execution. @@ -111,7 +111,7 @@ impl StaticFileProducerInner { } /// Listen for events on the static_file_producer. - pub fn events(&mut self) -> UnboundedReceiverStream { + pub fn events(&self) -> BroadcastStream { self.listeners.new_listener() } @@ -123,7 +123,7 @@ impl StaticFileProducerInner { /// /// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic /// lives in the `prune` crate. - pub fn run(&mut self, targets: StaticFileTargets) -> StaticFileProducerResult { + pub fn run(&self, targets: StaticFileTargets) -> StaticFileProducerResult { // If there are no targets, do not produce any static files and return early if !targets.any() { return Ok(targets) @@ -304,7 +304,7 @@ mod tests { fn run() { let (provider_factory, static_file_provider, _temp_static_files_dir) = setup(); - let mut static_file_producer = StaticFileProducerInner::new( + let static_file_producer = StaticFileProducerInner::new( provider_factory, static_file_provider.clone(), PruneModes::default(), @@ -392,7 +392,7 @@ mod tests { let tx = tx.clone(); std::thread::spawn(move || { - let mut locked_producer = producer.lock(); + let locked_producer = producer.lock(); if i == 0 { // Let other threads spawn as well. std::thread::sleep(Duration::from_millis(100)); diff --git a/crates/tokio-util/Cargo.toml b/crates/tokio-util/Cargo.toml index e8c21e0fa05..e9689f77340 100644 --- a/crates/tokio-util/Cargo.toml +++ b/crates/tokio-util/Cargo.toml @@ -12,6 +12,7 @@ description = "Additional utilities for working with Tokio in reth." workspace = true [dependencies] +tracing.workspace = true # async tokio = { workspace = true, features = ["sync"] } diff --git a/crates/tokio-util/src/event_listeners.rs b/crates/tokio-util/src/event_listeners.rs index 3c940e28022..66cb07e50fe 100644 --- a/crates/tokio-util/src/event_listeners.rs +++ b/crates/tokio-util/src/event_listeners.rs @@ -1,46 +1,59 @@ -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio::sync::broadcast::{self, Sender}; +use tokio_stream::wrappers::BroadcastStream; +use tracing::{error, warn}; -/// A collection of event listeners for a task. -#[derive(Clone, Debug)] +const DEFAULT_BROADCAST_CHANNEL_SIZE: usize = 1000; + +/// A bounded broadcast channel for a task. +#[derive(Debug)] pub struct EventListeners { - /// All listeners for events - listeners: Vec>, + /// The sender part of the broadcast channel + sender: Sender, +} + +impl Clone for EventListeners { + fn clone(&self) -> Self { + EventListeners { sender: self.sender.clone() } + } } -impl Default for EventListeners { +impl Default for EventListeners { fn default() -> Self { - Self { listeners: Vec::new() } + Self::new(DEFAULT_BROADCAST_CHANNEL_SIZE) } } -impl EventListeners { - /// Send an event to all listeners. - /// - /// Channels that were closed are removed. - pub fn notify(&mut self, event: T) { - self.listeners.retain(|listener| listener.send(event.clone()).is_ok()) +impl EventListeners { + /// Creates a new `EventListeners`. + pub fn new(broadcast_channel_size: usize) -> Self { + let (sender, _) = broadcast::channel(broadcast_channel_size); + Self { sender } } - /// Add a new event listener. - pub fn new_listener(&mut self) -> UnboundedReceiverStream { - let (sender, receiver) = mpsc::unbounded_channel(); - self.listeners.push(sender); - UnboundedReceiverStream::new(receiver) + /// Broadcast sender setter. + pub fn set_sender(&mut self, sender: Sender) { + self.sender = sender; } - /// Push new event listener. - pub fn push_listener(&mut self, listener: mpsc::UnboundedSender) { - self.listeners.push(listener); + /// Broadcasts an event to all listeners. + pub fn notify(&self, event: T) { + match self.sender.send(event) { + Ok(listener_count) => { + if listener_count == 0 { + warn!("notification of network event with 0 listeners"); + } + } + Err(_) => error!("channel closed"), + }; } - /// Returns the number of registered listeners. - pub fn len(&self) -> usize { - self.listeners.len() + /// Sender cloner. + pub fn clone_sender(&self) -> Sender { + self.sender.clone() } - /// Returns true if there are no registered listeners. - pub fn is_empty(&self) -> bool { - self.listeners.is_empty() + /// Adds a new event listener and returns the associated receiver. + pub fn new_listener(&self) -> BroadcastStream { + BroadcastStream::new(self.sender.subscribe()) } } diff --git a/examples/polygon-p2p/src/main.rs b/examples/polygon-p2p/src/main.rs index acfa3d82b1d..1a3c29419c6 100644 --- a/examples/polygon-p2p/src/main.rs +++ b/examples/polygon-p2p/src/main.rs @@ -72,7 +72,7 @@ async fn main() { while let Some(evt) = events.next().await { // For the sake of the example we only print the session established event // with the chain specific details - if let NetworkEvent::SessionEstablished { status, client_version, .. } = evt { + if let Ok(NetworkEvent::SessionEstablished { status, client_version, .. }) = evt { let chain = status.chain; info!(?chain, ?client_version, "Session established with a new peer."); }