diff --git a/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs b/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs index e25a6316ce..ffc8a5c557 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs @@ -24,16 +24,17 @@ fn test_builder() -> SequencerActorBuilder< MockUnsafePayloadGossipClient, > { let (_admin_api_tx, admin_api_rx) = mpsc::channel(20); - SequencerActorBuilder::new() - .with_active_status(true) - .with_admin_api_receiver(admin_api_rx) - .with_attributes_builder(TestAttributesBuilder { attributes: vec![] }) - .with_block_building_client(MockBlockBuildingClient::new()) - .with_cancellation_token(CancellationToken::new()) - .with_origin_selector(MockOriginSelector::new()) - .with_recovery_mode_status(false) - .with_rollup_config(Arc::new(RollupConfig::default())) - .with_unsafe_payload_gossip_client(MockUnsafePayloadGossipClient::new()) + SequencerActorBuilder::new( + admin_api_rx, + TestAttributesBuilder { attributes: vec![] }, + MockBlockBuildingClient::new(), + MockOriginSelector::new(), + Arc::new(RollupConfig::default()), + MockUnsafePayloadGossipClient::new(), + ) + .with_active_status(true) + .with_cancellation_token(CancellationToken::new()) + .with_recovery_mode_status(false) } #[rstest] @@ -42,7 +43,7 @@ async fn test_is_sequencer_active( #[values(true, false)] active: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = test_builder().with_active_status(active).build().unwrap(); + let mut actor = test_builder().with_active_status(active).build(); let result = async { match via_channel { @@ -73,8 +74,7 @@ async fn test_is_conductor_enabled( test_builder() } } - .build() - .unwrap(); + .build(); let result = async { match via_channel { @@ -98,7 +98,7 @@ async fn test_in_recovery_mode( #[values(true, false)] recovery_mode: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = test_builder().with_recovery_mode_status(recovery_mode).build().unwrap(); + let mut actor = test_builder().with_recovery_mode_status(recovery_mode).build(); let result = async { match via_channel { @@ -122,7 +122,7 @@ async fn test_start_sequencer( #[values(true, false)] already_started: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = test_builder().with_active_status(already_started).build().unwrap(); + let mut actor = test_builder().with_active_status(already_started).build(); // verify starting state let result = actor.is_sequencer_active().await; @@ -167,8 +167,7 @@ async fn test_stop_sequencer_success( let mut actor = test_builder() .with_block_building_client(client) .with_active_status(!already_stopped) - .build() - .unwrap(); + .build(); // verify starting state let result = actor.is_sequencer_active().await; @@ -205,7 +204,7 @@ async fn test_stop_sequencer_error_fetching_unsafe_head(#[values(true, false)] v .times(1) .return_once(|| Err(BlockEngineError::RequestError("whoops!".to_string()))); - let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + let mut actor = test_builder().with_block_building_client(client).build(); let result = async { match via_channel { @@ -234,7 +233,7 @@ async fn test_set_recovery_mode( #[values(true, false)] mode_to_set: bool, #[values(true, false)] via_channel: bool, ) { - let mut actor = test_builder().with_recovery_mode_status(starting_mode).build().unwrap(); + let mut actor = test_builder().with_recovery_mode_status(starting_mode).build(); // verify starting state let result = actor.in_recovery_mode().await; @@ -290,8 +289,7 @@ async fn test_override_leader( test_builder().with_conductor(conductor) } } - .build() - .unwrap(); + .build(); // call to override leader let result = async { @@ -324,7 +322,7 @@ async fn test_reset_derivation_pipeline_success(#[values(true, false)] via_chann let mut client = MockBlockBuildingClient::new(); client.expect_reset_engine_forkchoice().times(1).return_once(|| Ok(())); - let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + let mut actor = test_builder().with_block_building_client(client).build(); let result = async { match via_channel { @@ -350,7 +348,7 @@ async fn test_reset_derivation_pipeline_error(#[values(true, false)] via_channel .times(1) .return_once(|| Err(BlockEngineError::RequestError("reset failed".to_string()))); - let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + let mut actor = test_builder().with_block_building_client(client).build(); let result = async { match via_channel { @@ -382,11 +380,8 @@ async fn test_handle_admin_query_resilient_to_dropped_receiver() { client.expect_get_unsafe_head().times(1).returning(move || Ok(unsafe_head)); client.expect_reset_engine_forkchoice().times(1).returning(|| Ok(())); - let mut actor = test_builder() - .with_conductor(conductor) - .with_block_building_client(client) - .build() - .unwrap(); + let mut actor = + test_builder().with_conductor(conductor).with_block_building_client(client).build(); let mut queries: Vec = Vec::new(); { diff --git a/crates/node/service/src/actors/sequencer/builder.rs b/crates/node/service/src/actors/sequencer/builder.rs index 58f504f364..b9d58db930 100644 --- a/crates/node/service/src/actors/sequencer/builder.rs +++ b/crates/node/service/src/actors/sequencer/builder.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; /// Builder for constructing a [`SequencerActor`]. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct SequencerActorBuilder< AttributesBuilder_, BlockBuildingClient_, @@ -29,25 +29,25 @@ pub struct SequencerActorBuilder< UnsafePayloadGossipClient_: UnsafePayloadGossipClient, { /// Receiver for admin API requests. - pub admin_api_rx: Option>, + pub admin_api_rx: mpsc::Receiver, /// The attributes builder used for block building. - pub attributes_builder: Option, + pub attributes_builder: AttributesBuilder_, /// The struct used to build blocks. - pub block_building_client: Option, + pub block_building_client: BlockBuildingClient_, /// The cancellation token, shared between all tasks. - pub cancellation_token: Option, + pub cancellation_token: CancellationToken, /// The optional conductor RPC client. pub conductor: Option, /// Whether the sequencer is active. - pub is_active: Option, + pub is_active: bool, /// Whether the sequencer is in recovery mode. - pub in_recovery_mode: Option, + pub in_recovery_mode: bool, /// The struct used to determine the next L1 origin. - pub origin_selector: Option, + pub origin_selector: OriginSelector_, /// The rollup configuration. - pub rollup_config: Option>, + pub rollup_config: Arc, /// A client to asynchronously sign and gossip built payloads to the network actor. - pub unsafe_payload_gossip_client: Option, + pub unsafe_payload_gossip_client: UnsafePayloadGossipClient_, } impl< @@ -72,36 +72,43 @@ where UnsafePayloadGossipClient_: UnsafePayloadGossipClient, { /// Creates a new empty [`SequencerActorBuilder`]. - pub const fn new() -> Self { + pub fn new( + admin_api_rx: mpsc::Receiver, + attributes_builder: AttributesBuilder_, + block_building_client: BlockBuildingClient_, + origin_selector: OriginSelector_, + rollup_config: Arc, + unsafe_payload_gossip_client: UnsafePayloadGossipClient_, + ) -> Self { Self { - admin_api_rx: None, - attributes_builder: None, - block_building_client: None, - cancellation_token: None, + admin_api_rx, + attributes_builder, + block_building_client, + cancellation_token: CancellationToken::new(), conductor: None, - unsafe_payload_gossip_client: None, - is_active: None, - in_recovery_mode: None, - origin_selector: None, - rollup_config: None, + is_active: false, + in_recovery_mode: false, + origin_selector, + rollup_config, + unsafe_payload_gossip_client, } } /// Sets whether the sequencer is active. pub const fn with_active_status(mut self, is_active: bool) -> Self { - self.is_active = Some(is_active); + self.is_active = is_active; self } /// Sets whether the sequencer is in recovery mode. pub const fn with_recovery_mode_status(mut self, is_recovery_mode: bool) -> Self { - self.in_recovery_mode = Some(is_recovery_mode); + self.in_recovery_mode = is_recovery_mode; self } /// Sets the rollup configuration. pub fn with_rollup_config(mut self, rollup_config: Arc) -> Self { - self.rollup_config = Some(rollup_config); + self.rollup_config = rollup_config; self } @@ -110,13 +117,13 @@ where mut self, admin_api_rx: mpsc::Receiver, ) -> Self { - self.admin_api_rx = Some(admin_api_rx); + self.admin_api_rx = admin_api_rx; self } /// Sets the attributes builder. pub fn with_attributes_builder(mut self, attributes_builder: AttributesBuilder_) -> Self { - self.attributes_builder = Some(attributes_builder); + self.attributes_builder = attributes_builder; self } @@ -128,7 +135,7 @@ where /// Sets the origin selector. pub fn with_origin_selector(mut self, origin_selector: OriginSelector_) -> Self { - self.origin_selector = Some(origin_selector); + self.origin_selector = origin_selector; self } @@ -137,13 +144,13 @@ where mut self, block_building_client: BlockBuildingClient_, ) -> Self { - self.block_building_client = Some(block_building_client); + self.block_building_client = block_building_client; self } /// Sets the cancellation token. pub fn with_cancellation_token(mut self, token: CancellationToken) -> Self { - self.cancellation_token = Some(token); + self.cancellation_token = token; self } @@ -152,42 +159,34 @@ where mut self, gossip_client: UnsafePayloadGossipClient_, ) -> Self { - self.unsafe_payload_gossip_client = Some(gossip_client); + self.unsafe_payload_gossip_client = gossip_client; self } - /// Builds the [`SequencerActor`]. - /// - /// # Panics - /// - /// Panics if any required field is not set. + /// Builds the [`SequencerActor`]. + /// + /// This builder cannot fail because all required fields are provided + /// through `new()`. Optional fields (like `conductor`) may be unset. pub fn build( self, - ) -> Result< - SequencerActor< - AttributesBuilder_, - BlockBuildingClient_, - Conductor_, - OriginSelector_, - UnsafePayloadGossipClient_, - >, - String, + ) -> SequencerActor< + AttributesBuilder_, + BlockBuildingClient_, + Conductor_, + OriginSelector_, + UnsafePayloadGossipClient_, > { - Ok(SequencerActor { - admin_api_rx: self.admin_api_rx.expect("admin_api_rx is required"), - attributes_builder: self.attributes_builder.expect("attributes_builder is required"), - block_building_client: self - .block_building_client - .expect("block_building_client is required"), - cancellation_token: self.cancellation_token.expect("cancellation is required"), + SequencerActor { + admin_api_rx: self.admin_api_rx, + attributes_builder: self.attributes_builder, + block_building_client: self.block_building_client, + cancellation_token: self.cancellation_token, conductor: self.conductor, - is_active: self.is_active.expect("initial active status not set"), - in_recovery_mode: self.in_recovery_mode.expect("initial recovery mode status not set"), - origin_selector: self.origin_selector.expect("origin_selector is required"), - rollup_config: self.rollup_config.expect("rollup_config is required"), - unsafe_payload_gossip_client: self - .unsafe_payload_gossip_client - .expect("unsafe_payload_gossip_client is required"), - }) + is_active: self.is_active, + in_recovery_mode: self.in_recovery_mode, + origin_selector: self.origin_selector, + rollup_config: self.rollup_config, + unsafe_payload_gossip_client: self.unsafe_payload_gossip_client, + } } } diff --git a/crates/node/service/src/service/node.rs b/crates/node/service/src/service/node.rs index 57d0f0838b..f31a9acd54 100644 --- a/crates/node/service/src/service/node.rs +++ b/crates/node/service/src/service/node.rs @@ -187,70 +187,50 @@ impl RollupNode { // Create the RPC server actor. let rpc = self.rpc_builder().map(RpcActor::new); - let (sequencer_actor_builder, sequencer_admin_api_client) = if self.mode().is_sequencer() { + let (sequencer_actor, sequencer_admin_api_client) = if self.mode().is_sequencer() { // Create the admin API channel let (admin_api_tx, admin_api_rx) = mpsc::channel(1024); + // 2. CONFIGURE DEPENDENCIES let cfg = self.sequencer_config.clone(); - let builder = SequencerActorBuilder::new() - .with_active_status(!cfg.sequencer_stopped) - .with_recovery_mode_status(cfg.sequencer_recovery_mode) - .with_rollup_config(self.config.clone()) - .with_admin_api_receiver(admin_api_rx); + // will never panic if `mode().is_sequencer()` + let block_building_client = QueuedBlockBuildingClient { + build_request_tx: build_request_tx.unwrap(), + reset_request_tx: reset_request_tx.clone(), + seal_request_tx: seal_request_tx.unwrap(), + unsafe_head_rx: unsafe_head_rx.unwrap(), + }; - (Some(builder), Some(QueuedSequencerAdminAPIClient::new(admin_api_tx))) - } else { - (None, None) - }; - - // 2. CONFIGURE DEPENDENCIES - - let sequencer_actor = sequencer_actor_builder.map_or_else( - || None, - |mut builder| { - let cfg = self.sequencer_config.clone(); - - let l1_provider = DelayedL1OriginSelectorProvider::new( - self.l1_provider.clone(), - l1_head_updates_tx.subscribe(), - cfg.l1_conf_delay, - ); + let l1_provider = DelayedL1OriginSelectorProvider::new( + self.l1_provider.clone(), + l1_head_updates_tx.subscribe(), + cfg.l1_conf_delay, + ); - let origin_selector = L1OriginSelector::new(self.config.clone(), l1_provider); + let origin_selector = L1OriginSelector::new(self.config.clone(), l1_provider); - let block_building_client = QueuedBlockBuildingClient { - build_request_tx: build_request_tx.expect( - "build_request_tx is None in sequencer mode. This should never happen.", - ), - reset_request_tx: reset_request_tx.clone(), - seal_request_tx: seal_request_tx.expect( - "seal_request_tx is None in sequencer mode. This should never happen.", - ), - unsafe_head_rx: unsafe_head_rx.expect( - "unsafe_head_rx is None in sequencer mode. This should never happen.", - ), - }; + let mut builder = SequencerActorBuilder::new( + admin_api_rx, + self.create_attributes_builder(), + block_building_client, + origin_selector, + self.config.clone(), + QueuedUnsafePayloadGossipClient::new(gossip_payload_tx.clone()), + ) + .with_active_status(!cfg.sequencer_stopped) + .with_recovery_mode_status(cfg.sequencer_recovery_mode) + .with_cancellation_token(cancellation.clone()); - // Conditionally add conductor if configured - if let Some(conductor_url) = cfg.conductor_rpc_url { - builder = builder.with_conductor(ConductorClient::new_http(conductor_url)); - } + // Conditionally add conductor if configured + if let Some(conductor_url) = cfg.conductor_rpc_url { + builder = builder.with_conductor(ConductorClient::new_http(conductor_url)); + } - Some( - builder - .with_attributes_builder(self.create_attributes_builder()) - .with_block_building_client(block_building_client) - .with_cancellation_token(cancellation.clone()) - .with_unsafe_payload_gossip_client(QueuedUnsafePayloadGossipClient::new( - gossip_payload_tx.clone(), - )) - .with_origin_selector(origin_selector) - .build() - .expect("Failed to build SequencerActor"), - ) - }, - ); + (Some(builder.build()), Some(QueuedSequencerAdminAPIClient::new(admin_api_tx))) + } else { + (None, None) + }; crate::service::spawn_and_wait!( cancellation,