diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index 211beda4e1..fd5083d4a5 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -35,8 +35,7 @@ mod sequencer; pub use sequencer::{ Conductor, ConductorClient, ConductorError, DelayedL1OriginSelectorProvider, L1OriginSelector, L1OriginSelectorError, L1OriginSelectorProvider, OriginSelector, QueuedSequencerAdminAPIClient, - SequencerActor, SequencerActorBuilder, SequencerActorError, SequencerAdminQuery, - SequencerConfig, + SequencerActor, SequencerActorError, SequencerAdminQuery, SequencerConfig, }; #[cfg(test)] diff --git a/crates/node/service/src/actors/sequencer/actor.rs b/crates/node/service/src/actors/sequencer/actor.rs index 24f40a23e6..146f7204c6 100644 --- a/crates/node/service/src/actors/sequencer/actor.rs +++ b/crates/node/service/src/actors/sequencer/actor.rs @@ -89,32 +89,6 @@ pub struct SequencerActor< pub unsafe_payload_gossip_client: UnsafePayloadGossipClient_, } -impl< - AttributesBuilder_, - BlockBuildingClient_, - Conductor_, - OriginSelector_, - UnsafePayloadGossipClient_, -> CancellableContext - for SequencerActor< - AttributesBuilder_, - BlockBuildingClient_, - Conductor_, - OriginSelector_, - UnsafePayloadGossipClient_, - > -where - AttributesBuilder_: AttributesBuilder, - BlockBuildingClient_: BlockBuildingClient, - Conductor_: Conductor, - OriginSelector_: OriginSelector, - UnsafePayloadGossipClient_: UnsafePayloadGossipClient, -{ - fn cancelled(&self) -> WaitForCancellationFuture<'_> { - self.cancellation_token.cancelled() - } -} - impl< AttributesBuilder_, BlockBuildingClient_, @@ -489,6 +463,32 @@ where } } +impl< + AttributesBuilder_, + BlockBuildingClient_, + Conductor_, + OriginSelector_, + UnsafePayloadGossipClient_, +> CancellableContext + for SequencerActor< + AttributesBuilder_, + BlockBuildingClient_, + Conductor_, + OriginSelector_, + UnsafePayloadGossipClient_, + > +where + AttributesBuilder_: AttributesBuilder, + BlockBuildingClient_: BlockBuildingClient, + Conductor_: Conductor, + OriginSelector_: OriginSelector, + UnsafePayloadGossipClient_: UnsafePayloadGossipClient, +{ + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation_token.cancelled() + } +} + // Determines whether the provided [`SealTaskError`] is fatal for the sequencer. // // NB: We could use `err.severity()`, but that gives EngineActor control over this classification. diff --git a/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs b/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs index e25a6316ce..6a9e3c88ac 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs @@ -1,5 +1,5 @@ use crate::{ - BlockEngineError, ConductorError, SequencerActorBuilder, SequencerAdminQuery, + BlockEngineError, ConductorError, SequencerActor, SequencerAdminQuery, actors::{ MockBlockBuildingClient, MockConductor, MockOriginSelector, MockUnsafePayloadGossipClient, }, @@ -16,7 +16,7 @@ use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; // Returns a test SequencerActorBuilder with mocks that can be used or overridden. -fn test_builder() -> SequencerActorBuilder< +fn test_actor() -> SequencerActor< TestAttributesBuilder, MockBlockBuildingClient, MockConductor, @@ -24,16 +24,18 @@ fn test_builder() -> SequencerActorBuilder< MockUnsafePayloadGossipClient, > { let (_admin_api_tx, admin_api_rx) = mpsc::channel(20); - SequencerActorBuilder::new() - .with_active_status(true) - .with_admin_api_receiver(admin_api_rx) - .with_attributes_builder(TestAttributesBuilder { attributes: vec![] }) - .with_block_building_client(MockBlockBuildingClient::new()) - .with_cancellation_token(CancellationToken::new()) - .with_origin_selector(MockOriginSelector::new()) - .with_recovery_mode_status(false) - .with_rollup_config(Arc::new(RollupConfig::default())) - .with_unsafe_payload_gossip_client(MockUnsafePayloadGossipClient::new()) + SequencerActor { + admin_api_rx, + attributes_builder: TestAttributesBuilder { attributes: vec![] }, + block_building_client: MockBlockBuildingClient::new(), + cancellation_token: CancellationToken::new(), + conductor: None, + is_active: true, + in_recovery_mode: false, + origin_selector: MockOriginSelector::new(), + rollup_config: Arc::new(RollupConfig::default()), + unsafe_payload_gossip_client: MockUnsafePayloadGossipClient::new(), + } } #[rstest] @@ -42,7 +44,8 @@ async fn test_is_sequencer_active( #[values(true, false)] active: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = test_builder().with_active_status(active).build().unwrap(); + let mut actor = test_actor(); + actor.is_active = active; let result = async { match via_channel { @@ -66,15 +69,10 @@ async fn test_is_conductor_enabled( #[values(true, false)] conductor_exists: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = { - if conductor_exists { - test_builder().with_conductor(MockConductor::new()) - } else { - test_builder() - } - } - .build() - .unwrap(); + let mut actor = test_actor(); + if conductor_exists { + actor.conductor = Some(MockConductor::new()) + }; let result = async { match via_channel { @@ -98,7 +96,8 @@ async fn test_in_recovery_mode( #[values(true, false)] recovery_mode: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = test_builder().with_recovery_mode_status(recovery_mode).build().unwrap(); + let mut actor = test_actor(); + actor.in_recovery_mode = recovery_mode; let result = async { match via_channel { @@ -122,7 +121,8 @@ async fn test_start_sequencer( #[values(true, false)] already_started: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = test_builder().with_active_status(already_started).build().unwrap(); + let mut actor = test_actor(); + actor.is_active = already_started; // verify starting state let result = actor.is_sequencer_active().await; @@ -164,11 +164,9 @@ async fn test_stop_sequencer_success( let mut client = MockBlockBuildingClient::new(); client.expect_get_unsafe_head().times(1).return_once(move || Ok(unsafe_head)); - let mut actor = test_builder() - .with_block_building_client(client) - .with_active_status(!already_stopped) - .build() - .unwrap(); + let mut actor = test_actor(); + actor.block_building_client = client; + actor.is_active = !already_stopped; // verify starting state let result = actor.is_sequencer_active().await; @@ -205,7 +203,8 @@ async fn test_stop_sequencer_error_fetching_unsafe_head(#[values(true, false)] v .times(1) .return_once(|| Err(BlockEngineError::RequestError("whoops!".to_string()))); - let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + let mut actor = test_actor(); + actor.block_building_client = client; let result = async { match via_channel { @@ -234,7 +233,8 @@ async fn test_set_recovery_mode( #[values(true, false)] mode_to_set: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = test_builder().with_recovery_mode_status(starting_mode).build().unwrap(); + let mut actor = test_actor(); + actor.in_recovery_mode = starting_mode; // verify starting state let result = actor.in_recovery_mode().await; @@ -277,21 +277,23 @@ async fn test_override_leader( let mut actor = { // wire up conductor absence/presence and response error/success if !conductor_configured { - test_builder() + test_actor() } else if conductor_error { let mut conductor = MockConductor::new(); conductor.expect_override_leader().times(1).return_once(move || { Err(ConductorError::Rpc(RpcError::local_usage_str(conductor_error_string))) }); - test_builder().with_conductor(conductor) + let mut actor = test_actor(); + actor.conductor = Some(conductor); + actor } else { let mut conductor = MockConductor::new(); conductor.expect_override_leader().times(1).return_once(|| Ok(())); - test_builder().with_conductor(conductor) + let mut actor = test_actor(); + actor.conductor = Some(conductor); + actor } - } - .build() - .unwrap(); + }; // call to override leader let result = async { @@ -324,7 +326,8 @@ async fn test_reset_derivation_pipeline_success(#[values(true, false)] via_chann let mut client = MockBlockBuildingClient::new(); client.expect_reset_engine_forkchoice().times(1).return_once(|| Ok(())); - let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + let mut actor = test_actor(); + actor.block_building_client = client; let result = async { match via_channel { @@ -350,7 +353,8 @@ async fn test_reset_derivation_pipeline_error(#[values(true, false)] via_channel .times(1) .return_once(|| Err(BlockEngineError::RequestError("reset failed".to_string()))); - let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + let mut actor = test_actor(); + actor.block_building_client = client; let result = async { match via_channel { @@ -382,11 +386,9 @@ async fn test_handle_admin_query_resilient_to_dropped_receiver() { client.expect_get_unsafe_head().times(1).returning(move || Ok(unsafe_head)); client.expect_reset_engine_forkchoice().times(1).returning(|| Ok(())); - let mut actor = test_builder() - .with_conductor(conductor) - .with_block_building_client(client) - .build() - .unwrap(); + let mut actor = test_actor(); + actor.conductor = Some(conductor); + actor.block_building_client = client; let mut queries: Vec = Vec::new(); { diff --git a/crates/node/service/src/actors/sequencer/builder.rs b/crates/node/service/src/actors/sequencer/builder.rs deleted file mode 100644 index 58f504f364..0000000000 --- a/crates/node/service/src/actors/sequencer/builder.rs +++ /dev/null @@ -1,193 +0,0 @@ -//! Builder for [`SequencerActor`]. - -use crate::{ - UnsafePayloadGossipClient, - actors::{ - BlockBuildingClient, - sequencer::{Conductor, OriginSelector, SequencerActor, SequencerAdminQuery}, - }, -}; -use kona_derive::AttributesBuilder; -use kona_genesis::RollupConfig; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; - -/// Builder for constructing a [`SequencerActor`]. -#[derive(Debug, Default)] -pub struct SequencerActorBuilder< - AttributesBuilder_, - BlockBuildingClient_, - Conductor_, - OriginSelector_, - UnsafePayloadGossipClient_, -> where - AttributesBuilder_: AttributesBuilder, - BlockBuildingClient_: BlockBuildingClient, - Conductor_: Conductor, - OriginSelector_: OriginSelector, - UnsafePayloadGossipClient_: UnsafePayloadGossipClient, -{ - /// Receiver for admin API requests. - pub admin_api_rx: Option>, - /// The attributes builder used for block building. - pub attributes_builder: Option, - /// The struct used to build blocks. - pub block_building_client: Option, - /// The cancellation token, shared between all tasks. - pub cancellation_token: Option, - /// The optional conductor RPC client. - pub conductor: Option, - /// Whether the sequencer is active. - pub is_active: Option, - /// Whether the sequencer is in recovery mode. - pub in_recovery_mode: Option, - /// The struct used to determine the next L1 origin. - pub origin_selector: Option, - /// The rollup configuration. - pub rollup_config: Option>, - /// A client to asynchronously sign and gossip built payloads to the network actor. - pub unsafe_payload_gossip_client: Option, -} - -impl< - AttributesBuilder_, - BlockBuildingClient_, - Conductor_, - OriginSelector_, - UnsafePayloadGossipClient_, -> - SequencerActorBuilder< - AttributesBuilder_, - BlockBuildingClient_, - Conductor_, - OriginSelector_, - UnsafePayloadGossipClient_, - > -where - AttributesBuilder_: AttributesBuilder, - BlockBuildingClient_: BlockBuildingClient, - Conductor_: Conductor, - OriginSelector_: OriginSelector, - UnsafePayloadGossipClient_: UnsafePayloadGossipClient, -{ - /// Creates a new empty [`SequencerActorBuilder`]. - pub const fn new() -> Self { - Self { - admin_api_rx: None, - attributes_builder: None, - block_building_client: None, - cancellation_token: None, - conductor: None, - unsafe_payload_gossip_client: None, - is_active: None, - in_recovery_mode: None, - origin_selector: None, - rollup_config: None, - } - } - - /// Sets whether the sequencer is active. - pub const fn with_active_status(mut self, is_active: bool) -> Self { - self.is_active = Some(is_active); - self - } - - /// Sets whether the sequencer is in recovery mode. - pub const fn with_recovery_mode_status(mut self, is_recovery_mode: bool) -> Self { - self.in_recovery_mode = Some(is_recovery_mode); - self - } - - /// Sets the rollup configuration. - pub fn with_rollup_config(mut self, rollup_config: Arc) -> Self { - self.rollup_config = Some(rollup_config); - self - } - - /// Sets the admin API receiver. - pub fn with_admin_api_receiver( - mut self, - admin_api_rx: mpsc::Receiver, - ) -> Self { - self.admin_api_rx = Some(admin_api_rx); - self - } - - /// Sets the attributes builder. - pub fn with_attributes_builder(mut self, attributes_builder: AttributesBuilder_) -> Self { - self.attributes_builder = Some(attributes_builder); - self - } - - /// Sets the conductor. - pub fn with_conductor(mut self, conductor: Conductor_) -> Self { - self.conductor = Some(conductor); - self - } - - /// Sets the origin selector. - pub fn with_origin_selector(mut self, origin_selector: OriginSelector_) -> Self { - self.origin_selector = Some(origin_selector); - self - } - - /// Sets the block engine. - pub fn with_block_building_client( - mut self, - block_building_client: BlockBuildingClient_, - ) -> Self { - self.block_building_client = Some(block_building_client); - self - } - - /// Sets the cancellation token. - pub fn with_cancellation_token(mut self, token: CancellationToken) -> Self { - self.cancellation_token = Some(token); - self - } - - /// Sets the gossip payload sender. - pub fn with_unsafe_payload_gossip_client( - mut self, - gossip_client: UnsafePayloadGossipClient_, - ) -> Self { - self.unsafe_payload_gossip_client = Some(gossip_client); - self - } - - /// Builds the [`SequencerActor`]. - /// - /// # Panics - /// - /// Panics if any required field is not set. - pub fn build( - self, - ) -> Result< - SequencerActor< - AttributesBuilder_, - BlockBuildingClient_, - Conductor_, - OriginSelector_, - UnsafePayloadGossipClient_, - >, - String, - > { - Ok(SequencerActor { - admin_api_rx: self.admin_api_rx.expect("admin_api_rx is required"), - attributes_builder: self.attributes_builder.expect("attributes_builder is required"), - block_building_client: self - .block_building_client - .expect("block_building_client is required"), - cancellation_token: self.cancellation_token.expect("cancellation is required"), - conductor: self.conductor, - is_active: self.is_active.expect("initial active status not set"), - in_recovery_mode: self.in_recovery_mode.expect("initial recovery mode status not set"), - origin_selector: self.origin_selector.expect("origin_selector is required"), - rollup_config: self.rollup_config.expect("rollup_config is required"), - unsafe_payload_gossip_client: self - .unsafe_payload_gossip_client - .expect("unsafe_payload_gossip_client is required"), - }) - } -} diff --git a/crates/node/service/src/actors/sequencer/mod.rs b/crates/node/service/src/actors/sequencer/mod.rs index ddfc9b9054..4970ceae20 100644 --- a/crates/node/service/src/actors/sequencer/mod.rs +++ b/crates/node/service/src/actors/sequencer/mod.rs @@ -17,8 +17,6 @@ pub use admin_api_client::{QueuedSequencerAdminAPIClient, SequencerAdminQuery}; mod admin_api_impl; -mod builder; -pub use builder::SequencerActorBuilder; mod metrics; mod error; diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index b91ea4251e..35921d0291 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -26,8 +26,8 @@ pub use actors::{ NetworkConfig, NetworkContext, NetworkDriver, NetworkDriverError, NetworkHandler, NetworkInboundData, NodeActor, OriginSelector, PipelineBuilder, QueuedBlockBuildingClient, QueuedSequencerAdminAPIClient, QueuedUnsafePayloadGossipClient, ResetRequest, RpcActor, - RpcActorError, RpcContext, SealRequest, SequencerActor, SequencerActorBuilder, - SequencerActorError, SequencerAdminQuery, SequencerConfig, UnsafePayloadGossipClient, + RpcActorError, RpcContext, SealRequest, SequencerActor, SequencerActorError, + SequencerAdminQuery, SequencerConfig, UnsafePayloadGossipClient, UnsafePayloadGossipClientError, }; diff --git a/crates/node/service/src/service/node.rs b/crates/node/service/src/service/node.rs index 48c89c3af9..c32a0c172e 100644 --- a/crates/node/service/src/service/node.rs +++ b/crates/node/service/src/service/node.rs @@ -4,10 +4,10 @@ use crate::{ DerivationContext, EngineActor, EngineConfig, EngineContext, InteropMode, L1OriginSelector, L1WatcherActor, NetworkActor, NetworkBuilder, NetworkConfig, NetworkContext, NodeActor, NodeMode, QueuedBlockBuildingClient, QueuedSequencerAdminAPIClient, RpcActor, RpcContext, - SequencerConfig, + SequencerActor, SequencerConfig, actors::{ BlockStream, DerivationInboundChannels, EngineInboundData, NetworkInboundData, - QueuedUnsafePayloadGossipClient, SequencerActorBuilder, + QueuedUnsafePayloadGossipClient, }, }; use alloy_eips::BlockNumberOrTag; @@ -17,7 +17,7 @@ use kona_genesis::{L1ChainConfig, RollupConfig}; use kona_providers_alloy::{AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient}; use kona_rpc::RpcBuilder; use op_alloy_network::Optimism; -use std::{sync::Arc, time::Duration}; +use std::{ops::Not as _, sync::Arc, time::Duration}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -144,8 +144,6 @@ impl RollupNode { // Create a global cancellation token for graceful shutdown of tasks. let cancellation = CancellationToken::new(); - // 1. CONFIGURE STATE - // Create the derivation actor. let ( DerivationInboundChannels { @@ -188,22 +186,18 @@ impl RollupNode { // Create the RPC server actor. let rpc = self.rpc_builder().map(RpcActor::new); - let (sequencer_actor_builder, sequencer_admin_api_client) = if self.mode().is_sequencer() { - // Create the admin API channel - let (admin_api_tx, admin_api_rx) = mpsc::channel(1024); - - let cfg = self.sequencer_config.clone(); + let delayed_l1_provider = DelayedL1OriginSelectorProvider::new( + self.l1_config.engine_provider.clone(), + l1_head_updates_tx.subscribe(), + self.sequencer_config.l1_conf_delay, + ); - let builder = SequencerActorBuilder::new() - .with_active_status(!cfg.sequencer_stopped) - .with_recovery_mode_status(cfg.sequencer_recovery_mode) - .with_rollup_config(self.config.clone()) - .with_admin_api_receiver(admin_api_rx); + let delayed_origin_selector = + L1OriginSelector::new(self.config.clone(), delayed_l1_provider); - (Some(builder), Some(QueuedSequencerAdminAPIClient::new(admin_api_tx))) - } else { - (None, None) - }; + // Conditionally add conductor if configured + let conductor = + self.sequencer_config.conductor_rpc_url.clone().map(ConductorClient::new_http); // Create the L1 Watcher actor @@ -234,53 +228,47 @@ impl RollupNode { finalized_stream, ); - // 2. CONFIGURE DEPENDENCIES - - let sequencer_actor = sequencer_actor_builder.map_or_else( - || None, - |mut builder| { - let cfg = self.sequencer_config.clone(); - - let l1_provider = DelayedL1OriginSelectorProvider::new( - self.l1_config.engine_provider.clone(), - l1_head_updates_tx.subscribe(), - cfg.l1_conf_delay, - ); + // Create the sequencer if needed + let (sequencer_actor, sequencer_admin_api_tx) = if self.mode().is_sequencer() { + let block_building_client = QueuedBlockBuildingClient { + build_request_tx: build_request_tx.ok_or( + "build_request_tx is None in sequencer mode. This should never happen." + .to_string(), + )?, + reset_request_tx: reset_request_tx.clone(), + seal_request_tx: seal_request_tx.ok_or( + "seal_request_tx is None in sequencer mode. This should never happen." + .to_string(), + )?, + unsafe_head_rx: unsafe_head_rx.ok_or( + "unsafe_head_rx is None in sequencer mode. This should never happen." + .to_string(), + )?, + }; - let origin_selector = L1OriginSelector::new(self.config.clone(), l1_provider); - - let block_building_client = QueuedBlockBuildingClient { - build_request_tx: build_request_tx.expect( - "build_request_tx is None in sequencer mode. This should never happen.", - ), - reset_request_tx: reset_request_tx.clone(), - seal_request_tx: seal_request_tx.expect( - "seal_request_tx is None in sequencer mode. This should never happen.", - ), - unsafe_head_rx: unsafe_head_rx.expect( - "unsafe_head_rx is None in sequencer mode. This should never happen.", - ), - }; - - // Conditionally add conductor if configured - if let Some(conductor_url) = cfg.conductor_rpc_url { - builder = builder.with_conductor(ConductorClient::new_http(conductor_url)); - } - - Some( - builder - .with_attributes_builder(self.create_attributes_builder()) - .with_block_building_client(block_building_client) - .with_cancellation_token(cancellation.clone()) - .with_unsafe_payload_gossip_client(QueuedUnsafePayloadGossipClient::new( - gossip_payload_tx.clone(), - )) - .with_origin_selector(origin_selector) - .build() - .expect("Failed to build SequencerActor"), - ) - }, - ); + // Create the admin API channel + let (sequencer_admin_api_tx, sequencer_admin_api_rx) = mpsc::channel(1024); + let queued_gossip_client = + QueuedUnsafePayloadGossipClient::new(gossip_payload_tx.clone()); + + ( + Some(SequencerActor { + admin_api_rx: sequencer_admin_api_rx, + attributes_builder: self.create_attributes_builder(), + block_building_client, + cancellation_token: cancellation.clone(), + conductor, + is_active: self.sequencer_config.sequencer_stopped.not(), + in_recovery_mode: self.sequencer_config.sequencer_recovery_mode, + origin_selector: delayed_origin_selector, + rollup_config: self.config.clone(), + unsafe_payload_gossip_client: queued_gossip_client, + }), + Some(QueuedSequencerAdminAPIClient::new(sequencer_admin_api_tx)), + ) + } else { + (None, None) + }; crate::service::spawn_and_wait!( cancellation, @@ -291,7 +279,7 @@ impl RollupNode { cancellation: cancellation.clone(), p2p_network: network_rpc, network_admin: net_admin_rpc, - sequencer_admin: sequencer_admin_api_client, + sequencer_admin: sequencer_admin_api_tx, l1_watcher_queries: l1_query_tx, engine_query: engine_rpc, rollup_boost_admin: rollup_boost_admin_rpc,