diff --git a/crates/node/service/src/actors/derivation.rs b/crates/node/service/src/actors/derivation.rs index e1bb28d710..04dc3e2645 100644 --- a/crates/node/service/src/actors/derivation.rs +++ b/crates/node/service/src/actors/derivation.rs @@ -11,11 +11,7 @@ use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent}; use thiserror::Error; use tokio::{ select, - sync::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, - oneshot::Receiver as OneshotReceiver, - watch::Receiver as WatchReceiver, - }, + sync::{mpsc, oneshot::Receiver as OneshotReceiver, watch::Receiver as WatchReceiver}, }; use tokio_util::sync::CancellationToken; @@ -54,15 +50,15 @@ where /// occurs. /// /// Specs: - derivation_signal_rx: UnboundedReceiver, + derivation_signal_rx: mpsc::Receiver, /// The receiver for L1 head update notifications. l1_head_updates: mpsc::Receiver, /// The sender for derived [`OpAttributesWithParent`]s produced by the actor. - attributes_out: UnboundedSender, + attributes_out: mpsc::Sender, /// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward /// them to the engine. - reset_request_tx: UnboundedSender<()>, + reset_request_tx: mpsc::Sender<()>, /// A flag indicating whether the derivation pipeline is ready to start. engine_ready: bool, @@ -87,10 +83,10 @@ where pipeline: P, engine_l2_safe_head: WatchReceiver, sync_complete_rx: OneshotReceiver<()>, - derivation_signal_rx: UnboundedReceiver, + derivation_signal_rx: mpsc::Receiver, l1_head_updates: mpsc::Receiver, - attributes_out: UnboundedSender, - reset_request_tx: UnboundedSender<()>, + attributes_out: mpsc::Sender, + reset_request_tx: mpsc::Sender<()>, cancellation: CancellationToken, ) -> Self { Self { @@ -187,7 +183,7 @@ where kona_macros::inc!(counter, Metrics::L1_REORG_COUNT); } - self.reset_request_tx.send(()).map_err(|e| { + self.reset_request_tx.send(()).await.map_err(|e| { error!(target: "derivation", ?e, "Failed to send reset request"); DerivationError::Sender(Box::new(e)) })?; @@ -338,6 +334,7 @@ where // Send payload attributes out for processing. self.attributes_out .send(payload_attrs) + .await .map_err(|e| DerivationError::Sender(Box::new(e)))?; Ok(()) diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index 54e9115861..811857f305 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -16,7 +16,7 @@ use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope; use std::sync::Arc; use tokio::{ sync::{ - mpsc::{self, Receiver as MpscReceiver, UnboundedReceiver, UnboundedSender}, + mpsc::{self, Receiver as MpscReceiver}, oneshot::Sender as OneshotSender, watch::Sender as WatchSender, }, @@ -49,18 +49,18 @@ pub struct EngineActor { /// re-triggered can occur, but we will not block derivation on it. sync_complete_tx: Option>, /// A way for the engine actor to send a [`Signal`] back to the derivation actor. - derivation_signal_tx: UnboundedSender, + derivation_signal_tx: mpsc::Sender, /// Handler for inbound queries to the engine. inbound_queries: Option>, /// A channel to receive [`RuntimeConfig`] from the runtime actor. - runtime_config_rx: UnboundedReceiver, + runtime_config_rx: mpsc::Receiver, /// A channel to receive [`OpAttributesWithParent`] from the derivation actor. - attributes_rx: UnboundedReceiver, + attributes_rx: mpsc::Receiver, /// A channel to receive [`OpNetworkPayloadEnvelope`] from the network actor. - unsafe_block_rx: UnboundedReceiver, + unsafe_block_rx: mpsc::Receiver, /// A channel to receive reset requests. - reset_request_rx: UnboundedReceiver<()>, + reset_request_rx: mpsc::Receiver<()>, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, } @@ -74,11 +74,11 @@ impl EngineActor { engine: Engine, engine_l2_safe_head_tx: WatchSender, sync_complete_tx: OneshotSender<()>, - derivation_signal_tx: UnboundedSender, - runtime_config_rx: UnboundedReceiver, - attributes_rx: UnboundedReceiver, - unsafe_block_rx: UnboundedReceiver, - reset_request_rx: UnboundedReceiver<()>, + derivation_signal_tx: mpsc::Sender, + runtime_config_rx: mpsc::Receiver, + attributes_rx: mpsc::Receiver, + unsafe_block_rx: mpsc::Receiver, + reset_request_rx: mpsc::Receiver<()>, finalized_block_rx: mpsc::Receiver, inbound_queries: Option>, cancellation: CancellationToken, @@ -109,7 +109,7 @@ impl EngineActor { // Signal the derivation actor to reset. let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; - match self.derivation_signal_tx.send(signal.signal()) { + match self.derivation_signal_tx.send(signal.signal()).await { Ok(_) => debug!(target: "engine", "Sent reset signal to derivation actor"), Err(err) => { error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor"); @@ -143,7 +143,7 @@ impl EngineActor { // a "deposits-only" block and re-executed. At the same time, // the channel and any remaining buffered batches are flushed. warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline."); - match self.derivation_signal_tx.send(Signal::FlushChannel) { + match self.derivation_signal_tx.send(Signal::FlushChannel).await { Ok(_) => { debug!(target: "engine", "Sent flush signal to derivation actor") } diff --git a/crates/node/service/src/actors/network.rs b/crates/node/service/src/actors/network.rs index 171017873b..dc5af34212 100644 --- a/crates/node/service/src/actors/network.rs +++ b/crates/node/service/src/actors/network.rs @@ -8,10 +8,7 @@ use kona_p2p::Network; use libp2p::TransportError; use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope; use thiserror::Error; -use tokio::{ - select, - sync::mpsc::{self, UnboundedSender}, -}; +use tokio::{select, sync::mpsc}; use tokio_util::sync::CancellationToken; /// The network actor handles two core networking components of the rollup node: @@ -46,7 +43,7 @@ pub struct NetworkActor { /// Network driver driver: Network, /// The sender for [OpNetworkPayloadEnvelope]s received via p2p gossip. - blocks: UnboundedSender, + blocks: mpsc::Sender, /// The receiver for unsafe block signer updates. signer: mpsc::Receiver
, /// The cancellation token, shared between all tasks. @@ -57,7 +54,7 @@ impl NetworkActor { /// Constructs a new [`NetworkActor`] given the [`Network`] pub const fn new( driver: Network, - blocks: UnboundedSender, + blocks: mpsc::Sender, signer: mpsc::Receiver
, cancellation: CancellationToken, ) -> Self { @@ -94,7 +91,7 @@ impl NodeActor for NetworkActor { block = unsafe_block_receiver.recv() => { match block { Ok(block) => { - match self.blocks.send(block) { + match self.blocks.send(block).await { Ok(_) => debug!(target: "network", "Forwarded unsafe block"), Err(_) => warn!(target: "network", "Failed to forward unsafe block"), } diff --git a/crates/node/service/src/actors/runtime.rs b/crates/node/service/src/actors/runtime.rs index 47a2abac38..70b000dd6a 100644 --- a/crates/node/service/src/actors/runtime.rs +++ b/crates/node/service/src/actors/runtime.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use kona_sources::{RuntimeConfig, RuntimeLoader, RuntimeLoaderError}; use std::time::Duration; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use crate::NodeActor; @@ -19,7 +19,7 @@ pub struct RuntimeActor { /// The interval at which to load the runtime. interval: Duration, /// A channel to send the loaded runtime config to the engine actor. - runtime_config_tx: UnboundedSender, + runtime_config_tx: mpsc::Sender, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, } @@ -29,7 +29,7 @@ impl RuntimeActor { pub const fn new( loader: RuntimeLoader, interval: Duration, - runtime_config_tx: UnboundedSender, + runtime_config_tx: mpsc::Sender, cancellation: CancellationToken, ) -> Self { Self { loader, interval, runtime_config_tx, cancellation } @@ -44,7 +44,7 @@ pub struct RuntimeLauncher { /// The interval at which to load the runtime. interval: Option, /// The channel to send the [`RuntimeConfig`] to the engine actor. - tx: Option>, + tx: Option>, /// The cancellation token. cancellation: Option, } @@ -56,7 +56,7 @@ impl RuntimeLauncher { } /// Sets the runtime config tx channel. - pub fn with_tx(self, tx: UnboundedSender) -> Self { + pub fn with_tx(self, tx: mpsc::Sender) -> Self { Self { tx: Some(tx), ..self } } @@ -92,7 +92,7 @@ impl NodeActor for RuntimeActor { _ = interval.tick() => { let config = self.loader.load_latest().await?; debug!(target: "runtime", ?config, "Loaded latest runtime config"); - if let Err(e) = self.runtime_config_tx.send(config) { + if let Err(e) = self.runtime_config_tx.send(config).await { error!(target: "runtime", ?e, "Failed to send runtime config to the engine actor"); } } diff --git a/crates/node/service/src/service/validator.rs b/crates/node/service/src/service/validator.rs index 3fba5dfc59..04683379c3 100644 --- a/crates/node/service/src/service/validator.rs +++ b/crates/node/service/src/service/validator.rs @@ -89,12 +89,12 @@ pub trait ValidatorNodeService { // Create channels for communication between actors. let (new_head_tx, new_head_rx) = mpsc::channel(16); let (new_finalized_tx, new_finalized_rx) = mpsc::channel(16); - let (derived_payload_tx, derived_payload_rx) = mpsc::unbounded_channel(); - let (unsafe_block_tx, unsafe_block_rx) = mpsc::unbounded_channel(); + let (derived_payload_tx, derived_payload_rx) = mpsc::channel(16); + let (unsafe_block_tx, unsafe_block_rx) = mpsc::channel(1024); let (sync_complete_tx, sync_complete_rx) = oneshot::channel(); - let (runtime_config_tx, runtime_config_rx) = mpsc::unbounded_channel(); - let (derivation_signal_tx, derivation_signal_rx) = mpsc::unbounded_channel(); - let (reset_request_tx, reset_request_rx) = mpsc::unbounded_channel(); + let (runtime_config_tx, runtime_config_rx) = mpsc::channel(16); + let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16); + let (reset_request_tx, reset_request_rx) = mpsc::channel(16); let (block_signer_tx, block_signer_rx) = mpsc::channel(16); let (l1_watcher_queries_sender, l1_watcher_queries_recv) = tokio::sync::mpsc::channel(1024); diff --git a/crates/supervisor/service/src/actors/l1_watcher_rpc.rs b/crates/supervisor/service/src/actors/l1_watcher_rpc.rs index faccceac92..4ddd2e6ee9 100644 --- a/crates/supervisor/service/src/actors/l1_watcher_rpc.rs +++ b/crates/supervisor/service/src/actors/l1_watcher_rpc.rs @@ -15,7 +15,7 @@ use thiserror::Error; use tokio::{ select, sync::{ - mpsc::{UnboundedSender, error::SendError}, + mpsc::{self, error::SendError}, watch, }, task::JoinHandle, @@ -36,7 +36,7 @@ pub struct L1WatcherRpc { /// order to get the state from the external watcher. latest_head: watch::Sender>, /// The outbound event sender. - head_sender: UnboundedSender, + head_sender: mpsc::Sender, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, /// Inbound queries to the L1 watcher. @@ -48,7 +48,7 @@ impl L1WatcherRpc { pub fn new( config: Arc, l1_provider: RootProvider, - head_sender: UnboundedSender, + head_sender: mpsc::Sender, cancellation: CancellationToken, // Can be None if we disable communication with the L1 watcher. inbound_queries: Option>, @@ -192,7 +192,7 @@ impl SupervisorActor for L1WatcherRpc { }; // Send the head update event to all consumers. let head_block_info = self.block_info_by_hash(new_head).await?; - self.head_sender.send(head_block_info)?; + self.head_sender.send(head_block_info).await?; self.latest_head.send_replace(Some(head_block_info)); } } @@ -229,7 +229,7 @@ mod tests { async fn test_l1_watcher_creation() { let config = Arc::new(RollupConfig::default()); let provider = RootProvider::new(RpcClient::mocked(Asserter::new())); - let (head_sender, _) = mpsc::unbounded_channel(); + let (head_sender, _) = mpsc::channel(16); let cancellation = CancellationToken::new(); let (_query_sender, query_receiver) = mpsc::channel(1); @@ -243,7 +243,7 @@ mod tests { async fn test_query_processor_config() { let config = Arc::new(RollupConfig::default()); let provider = RootProvider::new(RpcClient::mocked(Asserter::new())); - let (head_sender, _) = mpsc::unbounded_channel(); + let (head_sender, _) = mpsc::channel(16); let cancellation = CancellationToken::new(); let (query_sender, query_receiver) = mpsc::channel(1); @@ -265,7 +265,7 @@ mod tests { async fn test_l1_state_query() { let config = Arc::new(RollupConfig::default()); let provider = RootProvider::new(RpcClient::mocked(Asserter::new())); - let (head_sender, _) = mpsc::unbounded_channel(); + let (head_sender, _) = mpsc::channel(16); let cancellation = CancellationToken::new(); let (query_sender, query_receiver) = mpsc::channel(1);