From 9ae9ef6860d5e620462c9daf08ef5849fba205d3 Mon Sep 17 00:00:00 2001 From: clabby Date: Fri, 6 Jun 2025 09:07:21 -0400 Subject: [PATCH] feat(node/service): Introduce backpressure in DA watcher channels --- crates/node/service/src/actors/derivation.rs | 6 +++--- .../node/service/src/actors/engine/actor.rs | 4 ++-- .../service/src/actors/engine/finalizer.rs | 6 +++--- .../node/service/src/actors/l1_watcher_rpc.rs | 20 +++++++++---------- crates/node/service/src/actors/network.rs | 6 +++--- .../node/service/src/service/standard/node.rs | 8 ++++---- crates/node/service/src/service/validator.rs | 17 +++++++--------- 7 files changed, 32 insertions(+), 35 deletions(-) diff --git a/crates/node/service/src/actors/derivation.rs b/crates/node/service/src/actors/derivation.rs index 27b3cfb5d6..082e3cdb0c 100644 --- a/crates/node/service/src/actors/derivation.rs +++ b/crates/node/service/src/actors/derivation.rs @@ -12,7 +12,7 @@ use thiserror::Error; use tokio::{ select, sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, + mpsc::{self, UnboundedReceiver, UnboundedSender}, oneshot::Receiver as OneshotReceiver, watch::Receiver as WatchReceiver, }, @@ -56,7 +56,7 @@ where /// Specs: derivation_signal_rx: UnboundedReceiver, /// The receiver for L1 head update notifications. - l1_head_updates: UnboundedReceiver, + l1_head_updates: mpsc::Receiver, /// The sender for derived [`OpAttributesWithParent`]s produced by the actor. attributes_out: UnboundedSender, @@ -88,7 +88,7 @@ where engine_l2_safe_head: WatchReceiver, sync_complete_rx: OneshotReceiver<()>, derivation_signal_rx: UnboundedReceiver, - l1_head_updates: UnboundedReceiver, + l1_head_updates: mpsc::Receiver, attributes_out: UnboundedSender, reset_request_tx: UnboundedSender<()>, cancellation: CancellationToken, diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index 43bf11f03f..54e9115861 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -16,7 +16,7 @@ use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope; use std::sync::Arc; use tokio::{ sync::{ - mpsc::{Receiver as MpscReceiver, UnboundedReceiver, UnboundedSender}, + mpsc::{self, Receiver as MpscReceiver, UnboundedReceiver, UnboundedSender}, oneshot::Sender as OneshotSender, watch::Sender as WatchSender, }, @@ -79,7 +79,7 @@ impl EngineActor { attributes_rx: UnboundedReceiver, unsafe_block_rx: UnboundedReceiver, reset_request_rx: UnboundedReceiver<()>, - finalized_block_rx: UnboundedReceiver, + finalized_block_rx: mpsc::Receiver, inbound_queries: Option>, cancellation: CancellationToken, ) -> Self { diff --git a/crates/node/service/src/actors/engine/finalizer.rs b/crates/node/service/src/actors/engine/finalizer.rs index 154be9245a..39797d41fe 100644 --- a/crates/node/service/src/actors/engine/finalizer.rs +++ b/crates/node/service/src/actors/engine/finalizer.rs @@ -3,7 +3,7 @@ use kona_engine::{Engine, EngineClient, EngineTask, FinalizeTask}; use kona_protocol::{BlockInfo, OpAttributesWithParent}; use std::{collections::BTreeMap, sync::Arc}; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc; /// An internal type alias for L1 block numbers. type L1BlockNumber = u64; @@ -17,7 +17,7 @@ type L2BlockNumber = u64; #[derive(Debug)] pub struct L2Finalizer { /// A channel that receives new finalized L1 blocks intermittently. - finalized_l1_block_rx: UnboundedReceiver, + finalized_l1_block_rx: mpsc::Receiver, /// An [`EngineClient`], used to create [`FinalizeTask`]s. client: Arc, /// A map of `L1 block number -> highest derived L2 block number` within the L1 epoch, used to @@ -30,7 +30,7 @@ pub struct L2Finalizer { impl L2Finalizer { /// Creates a new [`L2Finalizer`] with the given channel receiver for finalized L1 blocks. pub const fn new( - finalized_l1_block_rx: UnboundedReceiver, + finalized_l1_block_rx: mpsc::Receiver, client: Arc, ) -> Self { Self { finalized_l1_block_rx, client, awaiting_finalization: BTreeMap::new() } diff --git a/crates/node/service/src/actors/l1_watcher_rpc.rs b/crates/node/service/src/actors/l1_watcher_rpc.rs index b129228dba..4acd83f62c 100644 --- a/crates/node/service/src/actors/l1_watcher_rpc.rs +++ b/crates/node/service/src/actors/l1_watcher_rpc.rs @@ -18,7 +18,7 @@ use std::{sync::Arc, time::Duration}; use thiserror::Error; use tokio::{ select, - sync::mpsc::{UnboundedSender, error::SendError}, + sync::mpsc::{self, error::SendError}, task::JoinHandle, }; use tokio_util::sync::CancellationToken; @@ -35,11 +35,11 @@ pub struct L1WatcherRpc { /// order to get the state from the external watcher. latest_head: tokio::sync::watch::Sender>, /// The outbound L1 head block sender. - head_sender: UnboundedSender, + head_sender: mpsc::Sender, /// The outbound L1 finalized block sender. - finalized_sender: UnboundedSender, + finalized_sender: mpsc::Sender, /// The block signer sender. - block_signer_sender: UnboundedSender
, + block_signer_sender: mpsc::Sender
, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, /// Inbound queries to the L1 watcher. @@ -51,9 +51,9 @@ impl L1WatcherRpc { pub fn new( config: Arc, l1_provider: RootProvider, - head_sender: UnboundedSender, - finalized_sender: UnboundedSender, - block_signer_sender: UnboundedSender
, + head_sender: mpsc::Sender, + finalized_sender: mpsc::Sender, + block_signer_sender: mpsc::Sender
, cancellation: CancellationToken, // Can be None if we disable communication with the L1 watcher. inbound_queries: Option>, @@ -183,7 +183,7 @@ impl NodeActor for L1WatcherRpc { } Some(head_block_info) => { // Send the head update event to all consumers. - self.head_sender.send(head_block_info)?; + self.head_sender.send(head_block_info).await?; self.latest_head.send_replace(Some(head_block_info)); // For each log, attempt to construct a `SystemConfigLog`. @@ -203,7 +203,7 @@ impl NodeActor for L1WatcherRpc { target: "l1_watcher", "Unsafe block signer update: {unsafe_block_signer}" ); - if let Err(e) = self.block_signer_sender.send(unsafe_block_signer) { + if let Err(e) = self.block_signer_sender.send(unsafe_block_signer).await { error!( target: "l1_watcher", "Error sending unsafe block signer update: {e}" @@ -218,7 +218,7 @@ impl NodeActor for L1WatcherRpc { return Err(L1WatcherRpcError::StreamEnded); } Some(finalized_block_info) => { - self.finalized_sender.send(finalized_block_info)?; + self.finalized_sender.send(finalized_block_info).await?; } } } diff --git a/crates/node/service/src/actors/network.rs b/crates/node/service/src/actors/network.rs index 211234090e..171017873b 100644 --- a/crates/node/service/src/actors/network.rs +++ b/crates/node/service/src/actors/network.rs @@ -10,7 +10,7 @@ use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope; use thiserror::Error; use tokio::{ select, - sync::mpsc::{UnboundedReceiver, UnboundedSender}, + sync::mpsc::{self, UnboundedSender}, }; use tokio_util::sync::CancellationToken; @@ -48,7 +48,7 @@ pub struct NetworkActor { /// The sender for [OpNetworkPayloadEnvelope]s received via p2p gossip. blocks: UnboundedSender, /// The receiver for unsafe block signer updates. - signer: UnboundedReceiver
, + signer: mpsc::Receiver
, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, } @@ -58,7 +58,7 @@ impl NetworkActor { pub const fn new( driver: Network, blocks: UnboundedSender, - signer: UnboundedReceiver
, + signer: mpsc::Receiver
, cancellation: CancellationToken, ) -> Self { Self { driver, blocks, signer, cancellation } diff --git a/crates/node/service/src/service/standard/node.rs b/crates/node/service/src/service/standard/node.rs index 1a15eb5b75..2b4d696edb 100644 --- a/crates/node/service/src/service/standard/node.rs +++ b/crates/node/service/src/service/standard/node.rs @@ -9,7 +9,7 @@ use alloy_provider::RootProvider; use async_trait::async_trait; use op_alloy_network::Optimism; use std::sync::Arc; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use kona_genesis::RollupConfig; @@ -83,9 +83,9 @@ impl ValidatorNodeService for RollupNode { fn new_da_watcher( &self, - new_data_tx: UnboundedSender, - new_finalized_tx: UnboundedSender, - block_signer_tx: UnboundedSender
, + new_data_tx: mpsc::Sender, + new_finalized_tx: mpsc::Sender, + block_signer_tx: mpsc::Sender
, cancellation: CancellationToken, l1_watcher_inbound_queries: Option>, ) -> Self::DataAvailabilityWatcher { diff --git a/crates/node/service/src/service/validator.rs b/crates/node/service/src/service/validator.rs index 526c2f508e..3fba5dfc59 100644 --- a/crates/node/service/src/service/validator.rs +++ b/crates/node/service/src/service/validator.rs @@ -15,10 +15,7 @@ use kona_rpc::{ RpcLauncherError, WsRPC, WsServer, }; use std::fmt::Display; -use tokio::sync::{ - mpsc::{self, UnboundedSender}, - oneshot, -}; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; /// The [`ValidatorNodeService`] trait defines the common interface for running a validator node @@ -61,9 +58,9 @@ pub trait ValidatorNodeService { /// token is used to gracefully shut down the actor. fn new_da_watcher( &self, - new_data_tx: UnboundedSender, - new_finalized_tx: UnboundedSender, - block_signer_tx: UnboundedSender
, + new_data_tx: mpsc::Sender, + new_finalized_tx: mpsc::Sender, + block_signer_tx: mpsc::Sender
, cancellation: CancellationToken, l1_watcher_inbound_queries: Option>, ) -> Self::DataAvailabilityWatcher; @@ -90,8 +87,8 @@ pub trait ValidatorNodeService { let cancellation = CancellationToken::new(); // Create channels for communication between actors. - let (new_head_tx, new_head_rx) = mpsc::unbounded_channel(); - let (new_finalized_tx, new_finalized_rx) = mpsc::unbounded_channel(); + let (new_head_tx, new_head_rx) = mpsc::channel(16); + let (new_finalized_tx, new_finalized_rx) = mpsc::channel(16); let (derived_payload_tx, derived_payload_rx) = mpsc::unbounded_channel(); let (unsafe_block_tx, unsafe_block_rx) = mpsc::unbounded_channel(); let (sync_complete_tx, sync_complete_rx) = oneshot::channel(); @@ -99,7 +96,7 @@ pub trait ValidatorNodeService { let (derivation_signal_tx, derivation_signal_rx) = mpsc::unbounded_channel(); let (reset_request_tx, reset_request_rx) = mpsc::unbounded_channel(); - let (block_signer_tx, block_signer_rx) = mpsc::unbounded_channel(); + let (block_signer_tx, block_signer_rx) = mpsc::channel(16); let (l1_watcher_queries_sender, l1_watcher_queries_recv) = tokio::sync::mpsc::channel(1024); let da_watcher = Some(self.new_da_watcher( new_head_tx,