Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/node/service/src/actors/derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
use tokio::{
select,
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot::Receiver as OneshotReceiver,
watch::Receiver as WatchReceiver,
},
Expand Down Expand Up @@ -56,7 +56,7 @@ where
/// Specs: <https://specs.optimism.io/protocol/derivation.html#l1-sync-payload-attributes-processing>
derivation_signal_rx: UnboundedReceiver<Signal>,
/// The receiver for L1 head update notifications.
l1_head_updates: UnboundedReceiver<BlockInfo>,
l1_head_updates: mpsc::Receiver<BlockInfo>,

/// The sender for derived [`OpAttributesWithParent`]s produced by the actor.
attributes_out: UnboundedSender<OpAttributesWithParent>,
Expand Down Expand Up @@ -88,7 +88,7 @@ where
engine_l2_safe_head: WatchReceiver<L2BlockInfo>,
sync_complete_rx: OneshotReceiver<()>,
derivation_signal_rx: UnboundedReceiver<Signal>,
l1_head_updates: UnboundedReceiver<BlockInfo>,
l1_head_updates: mpsc::Receiver<BlockInfo>,
attributes_out: UnboundedSender<OpAttributesWithParent>,
reset_request_tx: UnboundedSender<()>,
cancellation: CancellationToken,
Expand Down
4 changes: 2 additions & 2 deletions crates/node/service/src/actors/engine/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope;
use std::sync::Arc;
use tokio::{
sync::{
mpsc::{Receiver as MpscReceiver, UnboundedReceiver, UnboundedSender},
mpsc::{self, Receiver as MpscReceiver, UnboundedReceiver, UnboundedSender},
oneshot::Sender as OneshotSender,
watch::Sender as WatchSender,
},
Expand Down Expand Up @@ -79,7 +79,7 @@ impl EngineActor {
attributes_rx: UnboundedReceiver<OpAttributesWithParent>,
unsafe_block_rx: UnboundedReceiver<OpNetworkPayloadEnvelope>,
reset_request_rx: UnboundedReceiver<()>,
finalized_block_rx: UnboundedReceiver<BlockInfo>,
finalized_block_rx: mpsc::Receiver<BlockInfo>,
inbound_queries: Option<MpscReceiver<EngineQueries>>,
cancellation: CancellationToken,
) -> Self {
Expand Down
6 changes: 3 additions & 3 deletions crates/node/service/src/actors/engine/finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use kona_engine::{Engine, EngineClient, EngineTask, FinalizeTask};
use kona_protocol::{BlockInfo, OpAttributesWithParent};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc;

/// An internal type alias for L1 block numbers.
type L1BlockNumber = u64;
Expand All @@ -17,7 +17,7 @@ type L2BlockNumber = u64;
#[derive(Debug)]
pub struct L2Finalizer {
/// A channel that receives new finalized L1 blocks intermittently.
finalized_l1_block_rx: UnboundedReceiver<BlockInfo>,
finalized_l1_block_rx: mpsc::Receiver<BlockInfo>,
/// An [`EngineClient`], used to create [`FinalizeTask`]s.
client: Arc<EngineClient>,
/// A map of `L1 block number -> highest derived L2 block number` within the L1 epoch, used to
Expand All @@ -30,7 +30,7 @@ pub struct L2Finalizer {
impl L2Finalizer {
/// Creates a new [`L2Finalizer`] with the given channel receiver for finalized L1 blocks.
pub const fn new(
finalized_l1_block_rx: UnboundedReceiver<BlockInfo>,
finalized_l1_block_rx: mpsc::Receiver<BlockInfo>,
client: Arc<EngineClient>,
) -> Self {
Self { finalized_l1_block_rx, client, awaiting_finalization: BTreeMap::new() }
Expand Down
20 changes: 10 additions & 10 deletions crates/node/service/src/actors/l1_watcher_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{sync::Arc, time::Duration};
use thiserror::Error;
use tokio::{
select,
sync::mpsc::{UnboundedSender, error::SendError},
sync::mpsc::{self, error::SendError},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
Expand All @@ -35,11 +35,11 @@ pub struct L1WatcherRpc {
/// order to get the state from the external watcher.
latest_head: tokio::sync::watch::Sender<Option<BlockInfo>>,
/// The outbound L1 head block sender.
head_sender: UnboundedSender<BlockInfo>,
head_sender: mpsc::Sender<BlockInfo>,
/// The outbound L1 finalized block sender.
finalized_sender: UnboundedSender<BlockInfo>,
finalized_sender: mpsc::Sender<BlockInfo>,
/// The block signer sender.
block_signer_sender: UnboundedSender<Address>,
block_signer_sender: mpsc::Sender<Address>,
/// The cancellation token, shared between all tasks.
cancellation: CancellationToken,
/// Inbound queries to the L1 watcher.
Expand All @@ -51,9 +51,9 @@ impl L1WatcherRpc {
pub fn new(
config: Arc<RollupConfig>,
l1_provider: RootProvider,
head_sender: UnboundedSender<BlockInfo>,
finalized_sender: UnboundedSender<BlockInfo>,
block_signer_sender: UnboundedSender<Address>,
head_sender: mpsc::Sender<BlockInfo>,
finalized_sender: mpsc::Sender<BlockInfo>,
block_signer_sender: mpsc::Sender<Address>,
cancellation: CancellationToken,
// Can be None if we disable communication with the L1 watcher.
inbound_queries: Option<tokio::sync::mpsc::Receiver<L1WatcherQueries>>,
Expand Down Expand Up @@ -183,7 +183,7 @@ impl NodeActor for L1WatcherRpc {
}
Some(head_block_info) => {
// Send the head update event to all consumers.
self.head_sender.send(head_block_info)?;
self.head_sender.send(head_block_info).await?;
self.latest_head.send_replace(Some(head_block_info));

// For each log, attempt to construct a `SystemConfigLog`.
Expand All @@ -203,7 +203,7 @@ impl NodeActor for L1WatcherRpc {
target: "l1_watcher",
"Unsafe block signer update: {unsafe_block_signer}"
);
if let Err(e) = self.block_signer_sender.send(unsafe_block_signer) {
if let Err(e) = self.block_signer_sender.send(unsafe_block_signer).await {
error!(
target: "l1_watcher",
"Error sending unsafe block signer update: {e}"
Expand All @@ -218,7 +218,7 @@ impl NodeActor for L1WatcherRpc {
return Err(L1WatcherRpcError::StreamEnded);
}
Some(finalized_block_info) => {
self.finalized_sender.send(finalized_block_info)?;
self.finalized_sender.send(finalized_block_info).await?;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/node/service/src/actors/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope;
use thiserror::Error;
use tokio::{
select,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
sync::mpsc::{self, UnboundedSender},
};
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -48,7 +48,7 @@ pub struct NetworkActor {
/// The sender for [OpNetworkPayloadEnvelope]s received via p2p gossip.
blocks: UnboundedSender<OpNetworkPayloadEnvelope>,
/// The receiver for unsafe block signer updates.
signer: UnboundedReceiver<Address>,
signer: mpsc::Receiver<Address>,
/// The cancellation token, shared between all tasks.
cancellation: CancellationToken,
}
Expand All @@ -58,7 +58,7 @@ impl NetworkActor {
pub const fn new(
driver: Network,
blocks: UnboundedSender<OpNetworkPayloadEnvelope>,
signer: UnboundedReceiver<Address>,
signer: mpsc::Receiver<Address>,
cancellation: CancellationToken,
) -> Self {
Self { driver, blocks, signer, cancellation }
Expand Down
8 changes: 4 additions & 4 deletions crates/node/service/src/service/standard/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use alloy_provider::RootProvider;
use async_trait::async_trait;
use op_alloy_network::Optimism;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use kona_genesis::RollupConfig;
Expand Down Expand Up @@ -83,9 +83,9 @@ impl ValidatorNodeService for RollupNode {

fn new_da_watcher(
&self,
new_data_tx: UnboundedSender<BlockInfo>,
new_finalized_tx: UnboundedSender<BlockInfo>,
block_signer_tx: UnboundedSender<Address>,
new_data_tx: mpsc::Sender<BlockInfo>,
new_finalized_tx: mpsc::Sender<BlockInfo>,
block_signer_tx: mpsc::Sender<Address>,
cancellation: CancellationToken,
l1_watcher_inbound_queries: Option<tokio::sync::mpsc::Receiver<L1WatcherQueries>>,
) -> Self::DataAvailabilityWatcher {
Expand Down
17 changes: 7 additions & 10 deletions crates/node/service/src/service/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use kona_rpc::{
RpcLauncherError, WsRPC, WsServer,
};
use std::fmt::Display;
use tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;

/// The [`ValidatorNodeService`] trait defines the common interface for running a validator node
Expand Down Expand Up @@ -61,9 +58,9 @@ pub trait ValidatorNodeService {
/// token is used to gracefully shut down the actor.
fn new_da_watcher(
&self,
new_data_tx: UnboundedSender<BlockInfo>,
new_finalized_tx: UnboundedSender<BlockInfo>,
block_signer_tx: UnboundedSender<Address>,
new_data_tx: mpsc::Sender<BlockInfo>,
new_finalized_tx: mpsc::Sender<BlockInfo>,
block_signer_tx: mpsc::Sender<Address>,
cancellation: CancellationToken,
l1_watcher_inbound_queries: Option<tokio::sync::mpsc::Receiver<L1WatcherQueries>>,
) -> Self::DataAvailabilityWatcher;
Expand All @@ -90,16 +87,16 @@ pub trait ValidatorNodeService {
let cancellation = CancellationToken::new();

// Create channels for communication between actors.
let (new_head_tx, new_head_rx) = mpsc::unbounded_channel();
let (new_finalized_tx, new_finalized_rx) = mpsc::unbounded_channel();
let (new_head_tx, new_head_rx) = mpsc::channel(16);
let (new_finalized_tx, new_finalized_rx) = mpsc::channel(16);
Comment on lines +90 to +91
Copy link

Copilot AI Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider if the fixed channel capacity of 16 is adequate under peak load; making this capacity configurable might help improve resilience in variable workload conditions.

Suggested change
let (new_head_tx, new_head_rx) = mpsc::channel(16);
let (new_finalized_tx, new_finalized_rx) = mpsc::channel(16);
let (new_head_tx, new_head_rx) = mpsc::channel(config.channel_capacity);
let (new_finalized_tx, new_finalized_rx) = mpsc::channel(config.channel_capacity);

Copilot uses AI. Check for mistakes.
let (derived_payload_tx, derived_payload_rx) = mpsc::unbounded_channel();
let (unsafe_block_tx, unsafe_block_rx) = mpsc::unbounded_channel();
let (sync_complete_tx, sync_complete_rx) = oneshot::channel();
let (runtime_config_tx, runtime_config_rx) = mpsc::unbounded_channel();
let (derivation_signal_tx, derivation_signal_rx) = mpsc::unbounded_channel();
let (reset_request_tx, reset_request_rx) = mpsc::unbounded_channel();

let (block_signer_tx, block_signer_rx) = mpsc::unbounded_channel();
let (block_signer_tx, block_signer_rx) = mpsc::channel(16);
let (l1_watcher_queries_sender, l1_watcher_queries_recv) = tokio::sync::mpsc::channel(1024);
let da_watcher = Some(self.new_da_watcher(
new_head_tx,
Expand Down
Loading