Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions crates/node/service/src/actors/derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent};
use thiserror::Error;
use tokio::{
select,
sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot::Receiver as OneshotReceiver,
watch::Receiver as WatchReceiver,
},
sync::{mpsc, oneshot::Receiver as OneshotReceiver, watch::Receiver as WatchReceiver},
};
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -54,15 +50,15 @@ where
/// occurs.
///
/// Specs: <https://specs.optimism.io/protocol/derivation.html#l1-sync-payload-attributes-processing>
derivation_signal_rx: UnboundedReceiver<Signal>,
derivation_signal_rx: mpsc::Receiver<Signal>,
/// The receiver for L1 head update notifications.
l1_head_updates: mpsc::Receiver<BlockInfo>,

/// The sender for derived [`OpAttributesWithParent`]s produced by the actor.
attributes_out: UnboundedSender<OpAttributesWithParent>,
attributes_out: mpsc::Sender<OpAttributesWithParent>,
/// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward
/// them to the engine.
reset_request_tx: UnboundedSender<()>,
reset_request_tx: mpsc::Sender<()>,

/// A flag indicating whether the derivation pipeline is ready to start.
engine_ready: bool,
Expand All @@ -87,10 +83,10 @@ where
pipeline: P,
engine_l2_safe_head: WatchReceiver<L2BlockInfo>,
sync_complete_rx: OneshotReceiver<()>,
derivation_signal_rx: UnboundedReceiver<Signal>,
derivation_signal_rx: mpsc::Receiver<Signal>,
l1_head_updates: mpsc::Receiver<BlockInfo>,
attributes_out: UnboundedSender<OpAttributesWithParent>,
reset_request_tx: UnboundedSender<()>,
attributes_out: mpsc::Sender<OpAttributesWithParent>,
reset_request_tx: mpsc::Sender<()>,
cancellation: CancellationToken,
) -> Self {
Self {
Expand Down Expand Up @@ -187,7 +183,7 @@ where
kona_macros::inc!(counter, Metrics::L1_REORG_COUNT);
}

self.reset_request_tx.send(()).map_err(|e| {
self.reset_request_tx.send(()).await.map_err(|e| {
error!(target: "derivation", ?e, "Failed to send reset request");
DerivationError::Sender(Box::new(e))
})?;
Expand Down Expand Up @@ -338,6 +334,7 @@ where
// Send payload attributes out for processing.
self.attributes_out
.send(payload_attrs)
.await
.map_err(|e| DerivationError::Sender(Box::new(e)))?;

Ok(())
Expand Down
26 changes: 13 additions & 13 deletions crates/node/service/src/actors/engine/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope;
use std::sync::Arc;
use tokio::{
sync::{
mpsc::{self, Receiver as MpscReceiver, UnboundedReceiver, UnboundedSender},
mpsc::{self, Receiver as MpscReceiver},
oneshot::Sender as OneshotSender,
watch::Sender as WatchSender,
},
Expand Down Expand Up @@ -49,18 +49,18 @@ pub struct EngineActor {
/// re-triggered can occur, but we will not block derivation on it.
sync_complete_tx: Option<OneshotSender<()>>,
/// A way for the engine actor to send a [`Signal`] back to the derivation actor.
derivation_signal_tx: UnboundedSender<Signal>,
derivation_signal_tx: mpsc::Sender<Signal>,

/// Handler for inbound queries to the engine.
inbound_queries: Option<MpscReceiver<EngineQueries>>,
/// A channel to receive [`RuntimeConfig`] from the runtime actor.
runtime_config_rx: UnboundedReceiver<RuntimeConfig>,
runtime_config_rx: mpsc::Receiver<RuntimeConfig>,
/// A channel to receive [`OpAttributesWithParent`] from the derivation actor.
attributes_rx: UnboundedReceiver<OpAttributesWithParent>,
attributes_rx: mpsc::Receiver<OpAttributesWithParent>,
/// A channel to receive [`OpNetworkPayloadEnvelope`] from the network actor.
unsafe_block_rx: UnboundedReceiver<OpNetworkPayloadEnvelope>,
unsafe_block_rx: mpsc::Receiver<OpNetworkPayloadEnvelope>,
/// A channel to receive reset requests.
reset_request_rx: UnboundedReceiver<()>,
reset_request_rx: mpsc::Receiver<()>,
/// The cancellation token, shared between all tasks.
cancellation: CancellationToken,
}
Expand All @@ -74,11 +74,11 @@ impl EngineActor {
engine: Engine,
engine_l2_safe_head_tx: WatchSender<L2BlockInfo>,
sync_complete_tx: OneshotSender<()>,
derivation_signal_tx: UnboundedSender<Signal>,
runtime_config_rx: UnboundedReceiver<RuntimeConfig>,
attributes_rx: UnboundedReceiver<OpAttributesWithParent>,
unsafe_block_rx: UnboundedReceiver<OpNetworkPayloadEnvelope>,
reset_request_rx: UnboundedReceiver<()>,
derivation_signal_tx: mpsc::Sender<Signal>,
runtime_config_rx: mpsc::Receiver<RuntimeConfig>,
attributes_rx: mpsc::Receiver<OpAttributesWithParent>,
unsafe_block_rx: mpsc::Receiver<OpNetworkPayloadEnvelope>,
reset_request_rx: mpsc::Receiver<()>,
finalized_block_rx: mpsc::Receiver<BlockInfo>,
inbound_queries: Option<MpscReceiver<EngineQueries>>,
cancellation: CancellationToken,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl EngineActor {

// Signal the derivation actor to reset.
let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) };
match self.derivation_signal_tx.send(signal.signal()) {
match self.derivation_signal_tx.send(signal.signal()).await {
Ok(_) => debug!(target: "engine", "Sent reset signal to derivation actor"),
Err(err) => {
error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor");
Expand Down Expand Up @@ -143,7 +143,7 @@ impl EngineActor {
// a "deposits-only" block and re-executed. At the same time,
// the channel and any remaining buffered batches are flushed.
warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline.");
match self.derivation_signal_tx.send(Signal::FlushChannel) {
match self.derivation_signal_tx.send(Signal::FlushChannel).await {
Ok(_) => {
debug!(target: "engine", "Sent flush signal to derivation actor")
}
Expand Down
11 changes: 4 additions & 7 deletions crates/node/service/src/actors/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use kona_p2p::Network;
use libp2p::TransportError;
use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope;
use thiserror::Error;
use tokio::{
select,
sync::mpsc::{self, UnboundedSender},
};
use tokio::{select, sync::mpsc};
use tokio_util::sync::CancellationToken;

/// The network actor handles two core networking components of the rollup node:
Expand Down Expand Up @@ -46,7 +43,7 @@ pub struct NetworkActor {
/// Network driver
driver: Network,
/// The sender for [OpNetworkPayloadEnvelope]s received via p2p gossip.
blocks: UnboundedSender<OpNetworkPayloadEnvelope>,
blocks: mpsc::Sender<OpNetworkPayloadEnvelope>,
/// The receiver for unsafe block signer updates.
signer: mpsc::Receiver<Address>,
/// The cancellation token, shared between all tasks.
Expand All @@ -57,7 +54,7 @@ impl NetworkActor {
/// Constructs a new [`NetworkActor`] given the [`Network`]
pub const fn new(
driver: Network,
blocks: UnboundedSender<OpNetworkPayloadEnvelope>,
blocks: mpsc::Sender<OpNetworkPayloadEnvelope>,
signer: mpsc::Receiver<Address>,
cancellation: CancellationToken,
) -> Self {
Expand Down Expand Up @@ -94,7 +91,7 @@ impl NodeActor for NetworkActor {
block = unsafe_block_receiver.recv() => {
match block {
Ok(block) => {
match self.blocks.send(block) {
match self.blocks.send(block).await {
Ok(_) => debug!(target: "network", "Forwarded unsafe block"),
Err(_) => warn!(target: "network", "Failed to forward unsafe block"),
}
Expand Down
12 changes: 6 additions & 6 deletions crates/node/service/src/actors/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use async_trait::async_trait;
use kona_sources::{RuntimeConfig, RuntimeLoader, RuntimeLoaderError};
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::NodeActor;
Expand All @@ -19,7 +19,7 @@ pub struct RuntimeActor {
/// The interval at which to load the runtime.
interval: Duration,
/// A channel to send the loaded runtime config to the engine actor.
runtime_config_tx: UnboundedSender<RuntimeConfig>,
runtime_config_tx: mpsc::Sender<RuntimeConfig>,
/// The cancellation token, shared between all tasks.
cancellation: CancellationToken,
}
Expand All @@ -29,7 +29,7 @@ impl RuntimeActor {
pub const fn new(
loader: RuntimeLoader,
interval: Duration,
runtime_config_tx: UnboundedSender<RuntimeConfig>,
runtime_config_tx: mpsc::Sender<RuntimeConfig>,
cancellation: CancellationToken,
) -> Self {
Self { loader, interval, runtime_config_tx, cancellation }
Expand All @@ -44,7 +44,7 @@ pub struct RuntimeLauncher {
/// The interval at which to load the runtime.
interval: Option<Duration>,
/// The channel to send the [`RuntimeConfig`] to the engine actor.
tx: Option<UnboundedSender<RuntimeConfig>>,
tx: Option<mpsc::Sender<RuntimeConfig>>,
/// The cancellation token.
cancellation: Option<CancellationToken>,
}
Expand All @@ -56,7 +56,7 @@ impl RuntimeLauncher {
}

/// Sets the runtime config tx channel.
pub fn with_tx(self, tx: UnboundedSender<RuntimeConfig>) -> Self {
pub fn with_tx(self, tx: mpsc::Sender<RuntimeConfig>) -> Self {
Self { tx: Some(tx), ..self }
}

Expand Down Expand Up @@ -92,7 +92,7 @@ impl NodeActor for RuntimeActor {
_ = interval.tick() => {
let config = self.loader.load_latest().await?;
debug!(target: "runtime", ?config, "Loaded latest runtime config");
if let Err(e) = self.runtime_config_tx.send(config) {
if let Err(e) = self.runtime_config_tx.send(config).await {
error!(target: "runtime", ?e, "Failed to send runtime config to the engine actor");
}
}
Expand Down
10 changes: 5 additions & 5 deletions crates/node/service/src/service/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ pub trait ValidatorNodeService {
// Create channels for communication between actors.
let (new_head_tx, new_head_rx) = mpsc::channel(16);
let (new_finalized_tx, new_finalized_rx) = mpsc::channel(16);
let (derived_payload_tx, derived_payload_rx) = mpsc::unbounded_channel();
let (unsafe_block_tx, unsafe_block_rx) = mpsc::unbounded_channel();
let (derived_payload_tx, derived_payload_rx) = mpsc::channel(16);
let (unsafe_block_tx, unsafe_block_rx) = mpsc::channel(1024);
let (sync_complete_tx, sync_complete_rx) = oneshot::channel();
let (runtime_config_tx, runtime_config_rx) = mpsc::unbounded_channel();
let (derivation_signal_tx, derivation_signal_rx) = mpsc::unbounded_channel();
let (reset_request_tx, reset_request_rx) = mpsc::unbounded_channel();
let (runtime_config_tx, runtime_config_rx) = mpsc::channel(16);
let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16);
let (reset_request_tx, reset_request_rx) = mpsc::channel(16);

let (block_signer_tx, block_signer_rx) = mpsc::channel(16);
let (l1_watcher_queries_sender, l1_watcher_queries_recv) = tokio::sync::mpsc::channel(1024);
Expand Down
14 changes: 7 additions & 7 deletions crates/supervisor/service/src/actors/l1_watcher_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use tokio::{
select,
sync::{
mpsc::{UnboundedSender, error::SendError},
mpsc::{self, error::SendError},
watch,
},
task::JoinHandle,
Expand All @@ -36,7 +36,7 @@
/// order to get the state from the external watcher.
latest_head: watch::Sender<Option<BlockInfo>>,
/// The outbound event sender.
head_sender: UnboundedSender<BlockInfo>,
head_sender: mpsc::Sender<BlockInfo>,
/// The cancellation token, shared between all tasks.
cancellation: CancellationToken,
/// Inbound queries to the L1 watcher.
Expand All @@ -48,7 +48,7 @@
pub fn new(
config: Arc<RollupConfig>,
l1_provider: RootProvider,
head_sender: UnboundedSender<BlockInfo>,
head_sender: mpsc::Sender<BlockInfo>,
cancellation: CancellationToken,
// Can be None if we disable communication with the L1 watcher.
inbound_queries: Option<tokio::sync::mpsc::Receiver<L1WatcherQueries>>,
Expand Down Expand Up @@ -192,7 +192,7 @@
};
// Send the head update event to all consumers.
let head_block_info = self.block_info_by_hash(new_head).await?;
self.head_sender.send(head_block_info)?;
self.head_sender.send(head_block_info).await?;

Check warning on line 195 in crates/supervisor/service/src/actors/l1_watcher_rpc.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/service/src/actors/l1_watcher_rpc.rs#L195

Added line #L195 was not covered by tests
self.latest_head.send_replace(Some(head_block_info));
}
}
Expand Down Expand Up @@ -229,7 +229,7 @@
async fn test_l1_watcher_creation() {
let config = Arc::new(RollupConfig::default());
let provider = RootProvider::new(RpcClient::mocked(Asserter::new()));
let (head_sender, _) = mpsc::unbounded_channel();
let (head_sender, _) = mpsc::channel(16);
let cancellation = CancellationToken::new();
let (_query_sender, query_receiver) = mpsc::channel(1);

Expand All @@ -243,7 +243,7 @@
async fn test_query_processor_config() {
let config = Arc::new(RollupConfig::default());
let provider = RootProvider::new(RpcClient::mocked(Asserter::new()));
let (head_sender, _) = mpsc::unbounded_channel();
let (head_sender, _) = mpsc::channel(16);
let cancellation = CancellationToken::new();
let (query_sender, query_receiver) = mpsc::channel(1);

Expand All @@ -265,7 +265,7 @@
async fn test_l1_state_query() {
let config = Arc::new(RollupConfig::default());
let provider = RootProvider::new(RpcClient::mocked(Asserter::new()));
let (head_sender, _) = mpsc::unbounded_channel();
let (head_sender, _) = mpsc::channel(16);
let cancellation = CancellationToken::new();
let (query_sender, query_receiver) = mpsc::channel(1);

Expand Down
Loading