diff --git a/crates/node/service/src/actors/l1_watcher/actor.rs b/crates/node/service/src/actors/l1_watcher/actor.rs new file mode 100644 index 0000000000..b9c072c60c --- /dev/null +++ b/crates/node/service/src/actors/l1_watcher/actor.rs @@ -0,0 +1,212 @@ +//! [`NodeActor`] implementation for an L1 chain watcher that polls for L1 block updates over HTTP +//! RPC. + +use crate::{ + NodeActor, + actors::{CancellableContext, l1_watcher::error::L1WatcherActorError}, +}; +use alloy_eips::BlockId; +use alloy_primitives::Address; +use alloy_provider::Provider; +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use kona_genesis::{RollupConfig, SystemConfigLog, SystemConfigUpdate, UnsafeBlockSignerUpdate}; +use kona_protocol::BlockInfo; +use kona_rpc::{L1State, L1WatcherQueries}; +use std::sync::Arc; +use tokio::{ + select, + sync::{ + mpsc::{self}, + watch, + }, +}; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; + +/// An L1 chain watcher that checks for L1 block updates over RPC. +#[derive(Debug)] +pub struct L1WatcherActor +where + BS: Stream + Unpin + Send, + L1P: Provider, +{ + /// The [`RollupConfig`] to tell if ecotone is active. + /// This is used to determine if the L1 watcher should check for unsafe block signer updates. + rollup_config: Arc, + /// The L1 provider. + l1_provider: L1P, + /// The inbound queries to the L1 watcher. + inbound_queries: mpsc::Receiver, + /// The latest L1 head block. + latest_head: watch::Sender>, + /// The latest L1 finalized block. + latest_finalized: watch::Sender>, + /// The block signer sender. + block_signer_sender: mpsc::Sender
, + /// The cancellation token, shared between all tasks. + cancellation: CancellationToken, + /// A stream over the latest head. + head_stream: BS, + /// A stream over the finalized block accepted as canonical. + finalized_stream: BS, +} +impl L1WatcherActor +where + BS: Stream + Unpin + Send, + L1P: Provider, +{ + /// Instantiate a new [`L1WatcherActor`]. + #[allow(clippy::too_many_arguments)] + pub const fn new( + rollup_config: Arc, + l1_provider: L1P, + l1_query_rx: mpsc::Receiver, + l1_head_updates_tx: watch::Sender>, + finalized_l1_block_tx: watch::Sender>, + signer: mpsc::Sender
, + cancellation: CancellationToken, + head_stream: BS, + finalized_stream: BS, + ) -> Self { + Self { + rollup_config, + l1_provider, + inbound_queries: l1_query_rx, + latest_head: l1_head_updates_tx, + latest_finalized: finalized_l1_block_tx, + block_signer_sender: signer, + cancellation, + head_stream, + finalized_stream, + } + } +} + +#[async_trait] +impl NodeActor for L1WatcherActor +where + BS: Stream + Unpin + Send + 'static, + L1P: Provider + 'static, +{ + type Error = L1WatcherActorError; + type StartData = (); + + /// Start the main processing loop. + async fn start(mut self, _: Self::StartData) -> Result<(), Self::Error> { + let cancel = self.cancellation.clone(); + let latest_head = self.latest_head.subscribe(); + + loop { + select! { + _ = cancel.cancelled() => { + // Exit the task on cancellation. + info!( + target: "l1_watcher", + "Received shutdown signal. Exiting L1 watcher task." + ); + + return Ok(()); + }, + new_head = self.head_stream.next() => match new_head { + None => { + return Err(L1WatcherActorError::StreamEnded); + } + Some(head_block_info) => { + // Send the head update event to all consumers. + self.latest_head.send_replace(Some(head_block_info)); + + // For each log, attempt to construct a [`SystemConfigLog`]. + // Build the [`SystemConfigUpdate`] from the log. + // If the update is an Unsafe block signer update, send the address + // to the block signer sender. + let filter_address = self.rollup_config.l1_system_config_address; + let logs = self.l1_provider .get_logs(&alloy_rpc_types_eth::Filter::new().address(filter_address).select(head_block_info.hash)).await?; + let ecotone_active = self.rollup_config.is_ecotone_active(head_block_info.timestamp); + for log in logs { + let sys_cfg_log = SystemConfigLog::new(log.into(), ecotone_active); + if let Ok(SystemConfigUpdate::UnsafeBlockSigner(UnsafeBlockSignerUpdate { unsafe_block_signer })) = sys_cfg_log.build() { + info!( + target: "l1_watcher", + "Unsafe block signer update: {unsafe_block_signer}" + ); + if let Err(e) = self.block_signer_sender.send(unsafe_block_signer).await { + error!( + target: "l1_watcher", + "Error sending unsafe block signer update: {e}" + ); + } + } + } + }, + }, + new_finalized = self.finalized_stream.next() => match new_finalized { + None => { + return Err(L1WatcherActorError::StreamEnded); + } + Some(finalized_block_info) => { + self.latest_finalized.send_replace(Some(finalized_block_info)); + } + }, + inbound_query = self.inbound_queries.recv() => match inbound_query { + Some(query) => { + match query { + L1WatcherQueries::Config(sender) => { + if let Err(e) = sender.send((*self.rollup_config).clone()) { + warn!(target: "l1_watcher", error = ?e, "Failed to send L1 config to the query sender"); + } + } + L1WatcherQueries::L1State(sender) => { + let current_l1 = *latest_head.borrow(); + + let head_l1 = match self.l1_provider.get_block(BlockId::latest()).await { + Ok(block) => block, + Err(e) => { + warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest head block"); + None + }}.map(|block| block.into_consensus().into()); + + let finalized_l1 = match self.l1_provider.get_block(BlockId::finalized()).await { + Ok(block) => block, + Err(e) => { + warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest finalized block"); + None + }}.map(|block| block.into_consensus().into()); + + let safe_l1 = match self.l1_provider.get_block(BlockId::safe()).await { + Ok(block) => block, + Err(e) => { + warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest safe block"); + None + }}.map(|block| block.into_consensus().into()); + + if let Err(e) = sender.send(L1State { + current_l1, + current_l1_finalized: finalized_l1, + head_l1, + safe_l1, + finalized_l1, + }) { + warn!(target: "l1_watcher", error = ?e, "Failed to send L1 state to the query sender"); + } + } + } + }, + None => { + error!(target: "l1_watcher", "L1 watcher query channel closed unexpectedly, exiting query processor task."); + return Err(L1WatcherActorError::StreamEnded) + } + } + } + } + } +} + +impl CancellableContext for L1WatcherActor +where + BS: Stream + Unpin + Send + 'static, + L1P: Provider, +{ + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() + } +} diff --git a/crates/node/service/src/actors/l1_watcher/blockstream.rs b/crates/node/service/src/actors/l1_watcher/blockstream.rs new file mode 100644 index 0000000000..7ecc01bc62 --- /dev/null +++ b/crates/node/service/src/actors/l1_watcher/blockstream.rs @@ -0,0 +1,68 @@ +use std::time::Duration; + +use alloy_eips::BlockNumberOrTag; +use alloy_provider::Provider; +use alloy_rpc_client::PollerBuilder; +use alloy_rpc_types_eth::Block; +use async_stream::stream; +use futures::{Stream, StreamExt}; +use kona_protocol::BlockInfo; + +/// A wrapper around a [`PollerBuilder`] that observes [`BlockInfo`] updates on a [`Provider`]. +/// +/// Note that this stream is not guaranteed to be contiguous. It may miss certain blocks, and +/// yielded items should only be considered to be the latest block matching the given +/// [`BlockNumberOrTag`]. +#[derive(Debug, Clone)] +pub struct BlockStream +where + L1P: Provider, +{ + /// The inner [`Provider`]. + l1_provider: L1P, + /// The block tag to poll for. + tag: BlockNumberOrTag, + /// The poll interval (in seconds). + poll_interval: Duration, +} + +impl BlockStream { + /// Creates a new [`Stream`] instance. + /// + /// # Returns + /// Returns error if the passed [`BlockNumberOrTag`] is of the [`BlockNumberOrTag::Number`] + /// variant. + pub fn new_as_stream( + l1_provider: L1P, + tag: BlockNumberOrTag, + poll_interval: Duration, + ) -> Result + Unpin + Send, String> { + if matches!(tag, BlockNumberOrTag::Number(_)) { + error!("Invalid BlockNumberOrTag variant - Must be a tag"); + } + Ok(Self { l1_provider, tag, poll_interval }.into_stream()) + } + + /// Creates a [`Stream`] of [`BlockInfo`]. + pub fn into_stream(self) -> impl Stream + Unpin + Send { + let mut poll_stream = PollerBuilder::<(BlockNumberOrTag, bool), Block>::new( + self.l1_provider.weak_client(), + "eth_getBlockByNumber", + (self.tag, false), + ) + .with_poll_interval(self.poll_interval) + .into_stream(); + + Box::pin(stream! { + let mut last_block = None; + while let Some(next) = poll_stream.next().await { + let info: BlockInfo = next.into_consensus().into(); + + if last_block.map(|b| b != info).unwrap_or(true) { + last_block = Some(info); + yield info; + } + } + }) + } +} diff --git a/crates/node/service/src/actors/l1_watcher/error.rs b/crates/node/service/src/actors/l1_watcher/error.rs new file mode 100644 index 0000000000..9add016e2e --- /dev/null +++ b/crates/node/service/src/actors/l1_watcher/error.rs @@ -0,0 +1,22 @@ +use std::sync::mpsc::SendError; + +use alloy_eips::BlockId; +use alloy_transport::TransportError; +use thiserror::Error; + +/// The error type for the `L1WatcherActor`. +#[derive(Error, Debug)] +pub enum L1WatcherActorError { + /// Error sending the head update event. + #[error("Error sending the head update event: {0}")] + SendError(#[from] SendError), + /// Error in the transport layer. + #[error("Transport error: {0}")] + Transport(#[from] TransportError), + /// The L1 block was not found. + #[error("L1 block not found: {0}")] + L1BlockNotFound(BlockId), + /// Stream ended unexpectedly. + #[error("Stream ended unexpectedly")] + StreamEnded, +} diff --git a/crates/node/service/src/actors/l1_watcher/mod.rs b/crates/node/service/src/actors/l1_watcher/mod.rs new file mode 100644 index 0000000000..0d2b76fb82 --- /dev/null +++ b/crates/node/service/src/actors/l1_watcher/mod.rs @@ -0,0 +1,8 @@ +mod actor; +pub use actor::L1WatcherActor; + +mod blockstream; +pub use blockstream::BlockStream; + +mod error; +pub use error::L1WatcherActorError; diff --git a/crates/node/service/src/actors/l1_watcher_rpc.rs b/crates/node/service/src/actors/l1_watcher_rpc.rs deleted file mode 100644 index 720e82f337..0000000000 --- a/crates/node/service/src/actors/l1_watcher_rpc.rs +++ /dev/null @@ -1,308 +0,0 @@ -//! [`NodeActor`] implementation for an L1 chain watcher that polls for L1 block updates over HTTP -//! RPC. - -use crate::{NodeActor, actors::CancellableContext}; -use alloy_eips::{BlockId, BlockNumberOrTag}; -use alloy_primitives::{Address, B256}; -use alloy_provider::{Provider, RootProvider}; -use alloy_rpc_client::PollerBuilder; -use alloy_rpc_types_eth::{Block, Log}; -use alloy_transport::TransportError; -use async_stream::stream; -use async_trait::async_trait; -use futures::{Stream, StreamExt}; -use kona_genesis::{RollupConfig, SystemConfigLog, SystemConfigUpdate, UnsafeBlockSignerUpdate}; -use kona_protocol::BlockInfo; -use kona_rpc::{L1State, L1WatcherQueries}; -use std::{sync::Arc, time::Duration}; -use thiserror::Error; -use tokio::{ - select, - sync::{ - mpsc::{self, error::SendError}, - watch, - }, - task::JoinHandle, -}; -use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; - -/// An L1 chain watcher that checks for L1 block updates over RPC. -#[derive(Debug)] -pub struct L1WatcherRpc { - state: L1WatcherRpcState, - /// The inbound queries to the L1 watcher. - pub inbound_queries: tokio::sync::mpsc::Receiver, -} - -/// The configuration for the L1 watcher actor. -#[derive(Debug)] -pub struct L1WatcherRpcState { - /// The [`RollupConfig`] to tell if ecotone is active. - /// This is used to determine if the L1 watcher should check for unsafe block signer updates. - pub rollup: Arc, - /// The L1 provider. - pub l1_provider: RootProvider, -} - -impl L1WatcherRpcState { - /// Fetches logs for the given block hash. - async fn fetch_logs(&self, block_hash: B256) -> Result, L1WatcherRpcError> { - let logs = self - .l1_provider - .get_logs(&alloy_rpc_types_eth::Filter::new().select(block_hash)) - .await?; - - Ok(logs) - } - - /// Spins up a task to process inbound queries. - fn start_query_processor( - &self, - mut inbound_queries: tokio::sync::mpsc::Receiver, - head_updates_recv: watch::Receiver>, - ) -> JoinHandle<()> { - // Start the inbound query processor in a separate task to avoid blocking the main task. - // We can cheaply clone the l1 provider here because it is an Arc. - let l1_provider = self.l1_provider.clone(); - let rollup_config = self.rollup.clone(); - - tokio::spawn(async move { - while let Some(query) = inbound_queries.recv().await { - match query { - L1WatcherQueries::Config(sender) => { - if let Err(e) = sender.send((*rollup_config).clone()) { - warn!(target: "l1_watcher", error = ?e, "Failed to send L1 config to the query sender"); - } - } - L1WatcherQueries::L1State(sender) => { - let current_l1 = *head_updates_recv.borrow(); - - let head_l1 = match l1_provider.get_block(BlockId::latest()).await { - Ok(block) => block, - Err(e) => { - warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest head block"); - None - }}.map(|block| block.into_consensus().into()); - - let finalized_l1 = match l1_provider.get_block(BlockId::finalized()).await { - Ok(block) => block, - Err(e) => { - warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest finalized block"); - None - }}.map(|block| block.into_consensus().into()); - - let safe_l1 = match l1_provider.get_block(BlockId::safe()).await { - Ok(block) => block, - Err(e) => { - warn!(target: "l1_watcher", error = ?e, "failed to query l1 provider for latest safe block"); - None - }}.map(|block| block.into_consensus().into()); - - if let Err(e) = sender.send(L1State { - current_l1, - current_l1_finalized: finalized_l1, - head_l1, - safe_l1, - finalized_l1, - }) { - warn!(target: "l1_watcher", error = ?e, "Failed to send L1 state to the query sender"); - } - } - } - } - - error!(target: "l1_watcher", "L1 watcher query channel closed unexpectedly, exiting query processor task."); - }) - } -} - -/// The inbound channels for the L1 watcher actor. -#[derive(Debug)] -pub struct L1WatcherRpcInboundChannels { - /// The inbound queries to the L1 watcher. - pub inbound_queries: tokio::sync::mpsc::Sender, -} - -/// The communication context used by the L1 watcher actor. -#[derive(Debug)] -pub struct L1WatcherRpcContext { - /// The latest L1 head block. - pub latest_head: watch::Sender>, - /// The latest L1 finalized block. - pub latest_finalized: watch::Sender>, - /// The block signer sender. - pub block_signer_sender: mpsc::Sender
, - /// The cancellation token, shared between all tasks. - pub cancellation: CancellationToken, -} - -impl CancellableContext for L1WatcherRpcContext { - fn cancelled(&self) -> WaitForCancellationFuture<'_> { - self.cancellation.cancelled() - } -} - -impl L1WatcherRpc { - /// Creates a new [`L1WatcherRpc`] instance. - pub fn new(config: L1WatcherRpcState) -> (L1WatcherRpcInboundChannels, Self) { - let (l1_watcher_queries_sender, l1_watcher_queries_recv) = mpsc::channel(1024); - - let actor = Self { state: config, inbound_queries: l1_watcher_queries_recv }; - (L1WatcherRpcInboundChannels { inbound_queries: l1_watcher_queries_sender }, actor) - } -} - -#[async_trait] -impl NodeActor for L1WatcherRpc { - type Error = L1WatcherRpcError; - type StartData = L1WatcherRpcContext; - - async fn start( - mut self, - L1WatcherRpcContext { latest_head, latest_finalized, block_signer_sender, cancellation }: Self::StartData, - ) -> Result<(), Self::Error> { - let mut head_stream = BlockStream::new( - &self.state.l1_provider, - BlockNumberOrTag::Latest, - Duration::from_secs(4), - ) - .into_stream(); - let mut finalized_stream = BlockStream::new( - &self.state.l1_provider, - BlockNumberOrTag::Finalized, - Duration::from_secs(60), - ) - .into_stream(); - - let inbound_query_processor = - self.state.start_query_processor(self.inbound_queries, latest_head.subscribe()); - - // Start the main processing loop. - loop { - select! { - _ = cancellation.cancelled() => { - // Exit the task on cancellation. - info!( - target: "l1_watcher", - "Received shutdown signal. Exiting L1 watcher task." - ); - - // Kill the inbound query processor. - inbound_query_processor.abort(); - - return Ok(()); - }, - new_head = head_stream.next() => match new_head { - None => { - return Err(L1WatcherRpcError::StreamEnded); - } - Some(head_block_info) => { - // Send the head update event to all consumers. - latest_head.send_replace(Some(head_block_info)); - - // For each log, attempt to construct a `SystemConfigLog`. - // Build the `SystemConfigUpdate` from the log. - // If the update is an Unsafe block signer update, send the address - // to the block signer sender. - let logs = self.state.fetch_logs(head_block_info.hash).await?; - let ecotone_active = self.state.rollup.is_ecotone_active(head_block_info.timestamp); - for log in logs { - if log.address() != self.state.rollup.l1_system_config_address { - continue; // Skip logs not related to the system config. - } - - let sys_cfg_log = SystemConfigLog::new(log.into(), ecotone_active); - if let Ok(SystemConfigUpdate::UnsafeBlockSigner(UnsafeBlockSignerUpdate { unsafe_block_signer })) = sys_cfg_log.build() { - info!( - target: "l1_watcher", - "Unsafe block signer update: {unsafe_block_signer}" - ); - if let Err(e) = block_signer_sender.send(unsafe_block_signer).await { - error!( - target: "l1_watcher", - "Error sending unsafe block signer update: {e}" - ); - } - } - } - }, - }, - new_finalized = finalized_stream.next() => match new_finalized { - None => { - return Err(L1WatcherRpcError::StreamEnded); - } - Some(finalized_block_info) => { - latest_finalized.send_replace(Some(finalized_block_info)); - } - } - } - } - } -} - -/// A wrapper around a [`PollerBuilder`] that observes [`BlockId`] updates on a [`RootProvider`]. -/// -/// Note that this stream is not guaranteed to be contiguous. It may miss certain blocks, and -/// yielded items should only be considered to be the latest block matching the given -/// [`BlockNumberOrTag`]. -struct BlockStream<'a> { - /// The inner [`RootProvider`]. - l1_provider: &'a RootProvider, - /// The block tag to poll for. - tag: BlockNumberOrTag, - /// The poll interval (in seconds). - poll_interval: Duration, -} - -impl<'a> BlockStream<'a> { - /// Creates a new [`BlockStream`] instance. - /// - /// ## Panics - /// Panics if the passed [`BlockNumberOrTag`] is of the [`BlockNumberOrTag::Number`] variant. - fn new(l1_provider: &'a RootProvider, tag: BlockNumberOrTag, poll_interval: Duration) -> Self { - if matches!(tag, BlockNumberOrTag::Number(_)) { - panic!("Invalid BlockNumberOrTag variant - Must be a tag"); - } - Self { l1_provider, tag, poll_interval } - } - - /// Transforms the watcher into a [`Stream`]. - fn into_stream(self) -> impl Stream + Unpin { - let mut poll_stream = PollerBuilder::<_, Block>::new( - self.l1_provider.weak_client(), - "eth_getBlockByNumber", - (self.tag, false), - ) - .with_poll_interval(self.poll_interval) - .into_stream(); - - Box::pin(stream! { - let mut last_block = None; - while let Some(next) = poll_stream.next().await { - let info: BlockInfo = next.into_consensus().into(); - - if last_block.map(|b| b != info).unwrap_or(true) { - last_block = Some(info); - yield info; - } - } - }) - } -} - -/// The error type for the [`L1WatcherRpc`]. -#[derive(Error, Debug)] -pub enum L1WatcherRpcError { - /// Error sending the head update event. - #[error("Error sending the head update event: {0}")] - SendError(#[from] SendError), - /// Error in the transport layer. - #[error("Transport error: {0}")] - Transport(#[from] TransportError), - /// The L1 block was not found. - #[error("L1 block not found: {0}")] - L1BlockNotFound(BlockId), - /// Stream ended unexpectedly. - #[error("Stream ended unexpectedly")] - StreamEnded, -} diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index c579d971b7..211beda4e1 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -21,11 +21,8 @@ pub use derivation::{ DerivationInboundChannels, DerivationState, InboundDerivationMessage, PipelineBuilder, }; -mod l1_watcher_rpc; -pub use l1_watcher_rpc::{ - L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError, L1WatcherRpcInboundChannels, - L1WatcherRpcState, -}; +mod l1_watcher; +pub use l1_watcher::{BlockStream, L1WatcherActor, L1WatcherActorError}; mod network; pub use network::{ diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index c8081ea7b0..b91ea4251e 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -16,19 +16,19 @@ pub use service::{ mod actors; pub use actors::{ - BlockBuildingClient, BlockEngineError, BlockEngineResult, BuildRequest, CancellableContext, - Conductor, ConductorClient, ConductorError, DelayedL1OriginSelectorProvider, DerivationActor, - DerivationBuilder, DerivationContext, DerivationError, DerivationInboundChannels, - DerivationState, EngineActor, EngineConfig, EngineContext, EngineError, EngineInboundData, - InboundDerivationMessage, L1OriginSelector, L1OriginSelectorError, L1OriginSelectorProvider, - L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError, L1WatcherRpcInboundChannels, - L1WatcherRpcState, L2Finalizer, NetworkActor, NetworkActorError, NetworkBuilder, - NetworkBuilderError, NetworkConfig, NetworkContext, NetworkDriver, NetworkDriverError, - NetworkHandler, NetworkInboundData, NodeActor, OriginSelector, PipelineBuilder, - QueuedBlockBuildingClient, QueuedSequencerAdminAPIClient, QueuedUnsafePayloadGossipClient, - ResetRequest, RpcActor, RpcActorError, RpcContext, SealRequest, SequencerActor, - SequencerActorBuilder, SequencerActorError, SequencerAdminQuery, SequencerConfig, - UnsafePayloadGossipClient, UnsafePayloadGossipClientError, + BlockBuildingClient, BlockEngineError, BlockEngineResult, BlockStream, BuildRequest, + CancellableContext, Conductor, ConductorClient, ConductorError, + DelayedL1OriginSelectorProvider, DerivationActor, DerivationBuilder, DerivationContext, + DerivationError, DerivationInboundChannels, DerivationState, EngineActor, EngineConfig, + EngineContext, EngineError, EngineInboundData, InboundDerivationMessage, L1OriginSelector, + L1OriginSelectorError, L1OriginSelectorProvider, L1WatcherActor, L1WatcherActorError, + L2Finalizer, NetworkActor, NetworkActorError, NetworkBuilder, NetworkBuilderError, + NetworkConfig, NetworkContext, NetworkDriver, NetworkDriverError, NetworkHandler, + NetworkInboundData, NodeActor, OriginSelector, PipelineBuilder, QueuedBlockBuildingClient, + QueuedSequencerAdminAPIClient, QueuedUnsafePayloadGossipClient, ResetRequest, RpcActor, + RpcActorError, RpcContext, SealRequest, SequencerActor, SequencerActorBuilder, + SequencerActorError, SequencerAdminQuery, SequencerConfig, UnsafePayloadGossipClient, + UnsafePayloadGossipClientError, }; mod metrics; diff --git a/crates/node/service/src/service/node.rs b/crates/node/service/src/service/node.rs index 7176f2c461..48c89c3af9 100644 --- a/crates/node/service/src/service/node.rs +++ b/crates/node/service/src/service/node.rs @@ -2,25 +2,28 @@ use crate::{ ConductorClient, DelayedL1OriginSelectorProvider, DerivationActor, DerivationBuilder, DerivationContext, EngineActor, EngineConfig, EngineContext, InteropMode, L1OriginSelector, - L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcState, NetworkActor, NetworkBuilder, - NetworkConfig, NetworkContext, NodeActor, NodeMode, QueuedBlockBuildingClient, - QueuedSequencerAdminAPIClient, RpcActor, RpcContext, SequencerConfig, + L1WatcherActor, NetworkActor, NetworkBuilder, NetworkConfig, NetworkContext, NodeActor, + NodeMode, QueuedBlockBuildingClient, QueuedSequencerAdminAPIClient, RpcActor, RpcContext, + SequencerConfig, actors::{ - DerivationInboundChannels, EngineInboundData, L1WatcherRpcInboundChannels, - NetworkInboundData, QueuedUnsafePayloadGossipClient, SequencerActorBuilder, + BlockStream, DerivationInboundChannels, EngineInboundData, NetworkInboundData, + QueuedUnsafePayloadGossipClient, SequencerActorBuilder, }, }; +use alloy_eips::BlockNumberOrTag; use alloy_provider::RootProvider; use kona_derive::StatefulAttributesBuilder; use kona_genesis::{L1ChainConfig, RollupConfig}; use kona_providers_alloy::{AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient}; use kona_rpc::RpcBuilder; use op_alloy_network::Optimism; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; const DERIVATION_PROVIDER_CACHE_SIZE: usize = 1024; +const HEAD_STREAM_POLL_INTERVAL: u64 = 4; +const FINALIZED_STREAM_POLL_INTERVAL: u64 = 60; /// The configuration for the L1 chain. #[derive(Debug, Clone)] @@ -65,14 +68,6 @@ impl RollupNode { self.engine_config.mode } - /// Returns a DA watcher builder for the node. - fn da_watcher_builder(&self) -> L1WatcherRpcState { - L1WatcherRpcState { - rollup: self.config.clone(), - l1_provider: self.l1_config.engine_provider.clone(), - } - } - /// Returns a derivation builder for the node. fn derivation_builder(&self) -> DerivationBuilder { DerivationBuilder { @@ -151,10 +146,6 @@ impl RollupNode { // 1. CONFIGURE STATE - // Create the DA watcher actor. - let (L1WatcherRpcInboundChannels { inbound_queries: da_watcher_rpc }, da_watcher) = - L1WatcherRpc::new(self.da_watcher_builder()); - // Create the derivation actor. let ( DerivationInboundChannels { @@ -214,6 +205,35 @@ impl RollupNode { (None, None) }; + // Create the L1 Watcher actor + + // A channel to send queries about the state of L1. + let (l1_query_tx, l1_query_rx) = mpsc::channel(1024); + + let head_stream = BlockStream::new_as_stream( + self.l1_config.engine_provider.clone(), + BlockNumberOrTag::Latest, + Duration::from_secs(HEAD_STREAM_POLL_INTERVAL), + )?; + let finalized_stream = BlockStream::new_as_stream( + self.l1_config.engine_provider.clone(), + BlockNumberOrTag::Finalized, + Duration::from_secs(FINALIZED_STREAM_POLL_INTERVAL), + )?; + + // Create the [`L1WatcherActor`]. Previously known as the DA watcher actor. + let l1_watcher = L1WatcherActor::new( + self.config.clone(), + self.l1_config.engine_provider.clone(), + l1_query_rx, + l1_head_updates_tx.clone(), + finalized_l1_block_tx.clone(), + signer, + cancellation.clone(), + head_stream, + finalized_stream, + ); + // 2. CONFIGURE DEPENDENCIES let sequencer_actor = sequencer_actor_builder.map_or_else( @@ -272,7 +292,7 @@ impl RollupNode { p2p_network: network_rpc, network_admin: net_admin_rpc, sequencer_admin: sequencer_admin_api_client, - l1_watcher_queries: da_watcher_rpc, + l1_watcher_queries: l1_query_tx, engine_query: engine_rpc, rollup_boost_admin: rollup_boost_admin_rpc, rollup_boost_health: rollup_boost_health_rpc, @@ -283,15 +303,7 @@ impl RollupNode { network, NetworkContext { blocks: unsafe_block_tx, cancellation: cancellation.clone() } )), - Some(( - da_watcher, - L1WatcherRpcContext { - latest_head: l1_head_updates_tx, - latest_finalized: finalized_l1_block_tx, - block_signer_sender: signer, - cancellation: cancellation.clone(), - } - )), + Some((l1_watcher, ())), Some(( derivation, DerivationContext { diff --git a/docs/docs/pages/node/run/mechanics.mdx b/docs/docs/pages/node/run/mechanics.mdx index 73551536bc..e53a582845 100644 --- a/docs/docs/pages/node/run/mechanics.mdx +++ b/docs/docs/pages/node/run/mechanics.mdx @@ -103,7 +103,7 @@ Key built-in actors include: - **NetworkActor**: Handles P2P networking and block gossip. - **RpcActor**: Runs the node's RPC server. - **SupervisorActor**: Integrates with the supervisor RPC API. -- **L1WatcherRpc**: Watches L1 for new blocks and events. +- **L1WatcherActor**: Watches L1 for new blocks and events. #### Extending with Custom Actors