diff --git a/crates/node/rpc/src/admin.rs b/crates/node/rpc/src/admin.rs index 124aca7b0f..2bec87d3c9 100644 --- a/crates/node/rpc/src/admin.rs +++ b/crates/node/rpc/src/admin.rs @@ -209,6 +209,18 @@ impl AdminApiServer for AdminRpc { .map_err(|_| ErrorObject::from(ErrorCode::InternalError)) .map(|execution_mode| GetExecutionModeResponse { execution_mode }) } + + async fn admin_reset_derivation_pipeline(&self) -> RpcResult<()> { + // If the sequencer is not enabled (mode runs in validator mode), return an error. + let Some(ref sequencer_client) = self.sequencer_admin_client else { + return Err(ErrorObject::from(ErrorCode::MethodNotFound)); + }; + + sequencer_client + .reset_derivation_pipeline() + .await + .map_err(|_| ErrorObject::from(ErrorCode::InternalError)) + } } /// The admin API client for the sequencer actor. @@ -234,6 +246,9 @@ pub trait SequencerAdminAPIClient: Send + Sync + Debug { /// Override the leader. async fn override_leader(&self) -> Result<(), SequencerAdminAPIError>; + + /// Reset the derivation pipeline. + async fn reset_derivation_pipeline(&self) -> Result<(), SequencerAdminAPIError>; } /// Errors that can occur when using the sequencer admin API. diff --git a/crates/node/rpc/src/jsonrpsee.rs b/crates/node/rpc/src/jsonrpsee.rs index dd90937d06..fd274fa3bd 100644 --- a/crates/node/rpc/src/jsonrpsee.rs +++ b/crates/node/rpc/src/jsonrpsee.rs @@ -200,6 +200,10 @@ pub trait AdminApi { #[method(name = "overrideLeader")] async fn admin_override_leader(&self) -> RpcResult<()>; + /// Resets the derivation pipeline. + #[method(name = "resetDerivationPipeline")] + async fn admin_reset_derivation_pipeline(&self) -> RpcResult<()>; + /// Sets the rollup boost execution mode. #[method(name = "setExecutionMode")] async fn set_execution_mode( diff --git a/crates/node/service/src/actors/sequencer/admin_api_client.rs b/crates/node/service/src/actors/sequencer/admin_api_client.rs index 720c8035e4..d1e8f63aac 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_client.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_client.rs @@ -32,6 +32,8 @@ pub enum SequencerAdminQuery { SetRecoveryMode(bool, oneshot::Sender>), /// A query to override the leader. OverrideLeader(oneshot::Sender>), + /// A query to reset the derivation pipeline. + ResetDerivationPipeline(oneshot::Sender>), } #[async_trait] @@ -112,4 +114,15 @@ impl SequencerAdminAPIClient for QueuedSequencerAdminAPIClient { SequencerAdminAPIError::ResponseError("response channel closed".to_string()) })? } + + async fn reset_derivation_pipeline(&self) -> Result<(), SequencerAdminAPIError> { + let (tx, rx) = oneshot::channel(); + + self.request_tx.send(SequencerAdminQuery::ResetDerivationPipeline(tx)).await.map_err( + |_| SequencerAdminAPIError::RequestError("request channel closed".to_string()), + )?; + rx.await.map_err(|_| { + SequencerAdminAPIError::ResponseError("response channel closed".to_string()) + })? + } } diff --git a/crates/node/service/src/actors/sequencer/admin_api_impl.rs b/crates/node/service/src/actors/sequencer/admin_api_impl.rs index 413e79735b..4b1a4dac5e 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_impl.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_impl.rs @@ -67,6 +67,11 @@ where warn!(target: "sequencer", "Failed to send response for override_leader query"); } } + SequencerAdminQuery::ResetDerivationPipeline(tx) => { + if tx.send(self.reset_derivation_pipeline().await).is_err() { + warn!(target: "sequencer", "Failed to send response for reset_derivation_pipeline query"); + } + } } } @@ -147,4 +152,12 @@ where Ok(()) } + + pub(super) async fn reset_derivation_pipeline(&mut self) -> Result<(), SequencerAdminAPIError> { + info!(target: "sequencer", "Resetting derivation pipeline"); + self.block_building_client.reset_engine_forkchoice().await.map_err(|e| { + error!(target: "sequencer", err=?e, "Failed to reset engine forkchoice"); + SequencerAdminAPIError::RequestError(format!("Failed to reset engine: {e}")) + }) + } } diff --git a/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs b/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs index 3aa6a1f32c..e25a6316ce 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs @@ -318,6 +318,56 @@ async fn test_override_leader( } } +#[rstest] +#[tokio::test] +async fn test_reset_derivation_pipeline_success(#[values(true, false)] via_channel: bool) { + let mut client = MockBlockBuildingClient::new(); + client.expect_reset_engine_forkchoice().times(1).return_once(|| Ok(())); + + let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + + let result = async { + match via_channel { + false => actor.reset_derivation_pipeline().await, + true => { + let (tx, rx) = oneshot::channel(); + actor.handle_admin_query(SequencerAdminQuery::ResetDerivationPipeline(tx)).await; + rx.await.unwrap() + } + } + } + .await; + + assert!(result.is_ok()); +} + +#[rstest] +#[tokio::test] +async fn test_reset_derivation_pipeline_error(#[values(true, false)] via_channel: bool) { + let mut client = MockBlockBuildingClient::new(); + client + .expect_reset_engine_forkchoice() + .times(1) + .return_once(|| Err(BlockEngineError::RequestError("reset failed".to_string()))); + + let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + + let result = async { + match via_channel { + false => actor.reset_derivation_pipeline().await, + true => { + let (tx, rx) = oneshot::channel(); + actor.handle_admin_query(SequencerAdminQuery::ResetDerivationPipeline(tx)).await; + rx.await.unwrap() + } + } + } + .await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Failed to reset engine")); +} + #[rstest] #[tokio::test] async fn test_handle_admin_query_resilient_to_dropped_receiver() { @@ -330,6 +380,7 @@ async fn test_handle_admin_query_resilient_to_dropped_receiver() { }; let mut client = MockBlockBuildingClient::new(); client.expect_get_unsafe_head().times(1).returning(move || Ok(unsafe_head)); + client.expect_reset_engine_forkchoice().times(1).returning(|| Ok(())); let mut actor = test_builder() .with_conductor(conductor) @@ -373,6 +424,11 @@ async fn test_handle_admin_query_resilient_to_dropped_receiver() { let (tx, _rx) = oneshot::channel(); queries.push(SequencerAdminQuery::OverrideLeader(tx)); } + { + // immediately drop receiver + let (tx, _rx) = oneshot::channel(); + queries.push(SequencerAdminQuery::ResetDerivationPipeline(tx)); + } // None of these should fail even if the receiver is dropped for query in queries {