From bdce0d5bdf9a7add3e3c193fae688eb5a5eb450b Mon Sep 17 00:00:00 2001 From: Mablr <59505383+mablr@users.noreply.github.com> Date: Mon, 1 Dec 2025 21:22:51 +0100 Subject: [PATCH 1/2] feat(node): add `admin_resetDerivationPipeline` RPC endpoint --- crates/node/rpc/src/admin.rs | 15 +++++++++++++++ crates/node/rpc/src/jsonrpsee.rs | 4 ++++ .../src/actors/sequencer/admin_api_client.rs | 13 +++++++++++++ .../src/actors/sequencer/admin_api_impl.rs | 13 +++++++++++++ 4 files changed, 45 insertions(+) diff --git a/crates/node/rpc/src/admin.rs b/crates/node/rpc/src/admin.rs index 023ec564fa..91040c952d 100644 --- a/crates/node/rpc/src/admin.rs +++ b/crates/node/rpc/src/admin.rs @@ -197,6 +197,18 @@ impl AdminApiServer for AdminRpc { .map_err(|_| ErrorObject::from(ErrorCode::InternalError)) .map(|execution_mode| GetExecutionModeResponse { execution_mode }) } + + async fn admin_reset_derivation_pipeline(&self) -> RpcResult<()> { + // If the sequencer is not enabled (mode runs in validator mode), return an error. + let Some(ref sequencer_client) = self.sequencer_admin_client else { + return Err(ErrorObject::from(ErrorCode::MethodNotFound)); + }; + + sequencer_client + .reset_derivation_pipeline() + .await + .map_err(|_| ErrorObject::from(ErrorCode::InternalError)) + } } /// The admin API client for the sequencer actor. @@ -219,6 +231,9 @@ pub trait SequencerAdminAPIClient: Send + Sync + Debug { /// Override the leader. async fn override_leader(&self) -> Result<(), SequencerAdminAPIError>; + + /// Reset the derivation pipeline. + async fn reset_derivation_pipeline(&self) -> Result<(), SequencerAdminAPIError>; } /// Errors that can occur when using the sequencer admin API. diff --git a/crates/node/rpc/src/jsonrpsee.rs b/crates/node/rpc/src/jsonrpsee.rs index a63395913a..8aa08b5f4e 100644 --- a/crates/node/rpc/src/jsonrpsee.rs +++ b/crates/node/rpc/src/jsonrpsee.rs @@ -196,6 +196,10 @@ pub trait AdminApi { #[method(name = "overrideLeader")] async fn admin_override_leader(&self) -> RpcResult<()>; + /// Resets the derivation pipeline. + #[method(name = "resetDerivationPipeline")] + async fn admin_reset_derivation_pipeline(&self) -> RpcResult<()>; + /// Sets the rollup boost execution mode. #[method(name = "setExecutionMode")] async fn set_execution_mode( diff --git a/crates/node/service/src/actors/sequencer/admin_api_client.rs b/crates/node/service/src/actors/sequencer/admin_api_client.rs index 2a973fcd22..8010679e26 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_client.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_client.rs @@ -30,6 +30,8 @@ pub enum SequencerAdminQuery { SetRecoveryMode(bool, oneshot::Sender>), /// A query to override the leader. OverrideLeader(oneshot::Sender>), + /// A query to reset the derivation pipeline. + ResetDerivationPipeline(oneshot::Sender>), } #[async_trait] @@ -99,4 +101,15 @@ impl SequencerAdminAPIClient for QueuedSequencerAdminAPIClient { SequencerAdminAPIError::ResponseError("response channel closed".to_string()) })? } + + async fn reset_derivation_pipeline(&self) -> Result<(), SequencerAdminAPIError> { + let (tx, rx) = oneshot::channel(); + + self.request_tx.send(SequencerAdminQuery::ResetDerivationPipeline(tx)).await.map_err( + |_| SequencerAdminAPIError::RequestError("request channel closed".to_string()), + )?; + rx.await.map_err(|_| { + SequencerAdminAPIError::ResponseError("response channel closed".to_string()) + })? + } } diff --git a/crates/node/service/src/actors/sequencer/admin_api_impl.rs b/crates/node/service/src/actors/sequencer/admin_api_impl.rs index f6fedcbe51..244eeaa8ed 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_impl.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_impl.rs @@ -62,6 +62,11 @@ where warn!(target: "sequencer", "Failed to send response for override_leader query"); } } + SequencerAdminQuery::ResetDerivationPipeline(tx) => { + if tx.send(self.reset_derivation_pipeline().await).is_err() { + warn!(target: "sequencer", "Failed to send response for reset_derivation_pipeline query"); + } + } } } @@ -126,4 +131,12 @@ where Ok(()) } + + pub(super) async fn reset_derivation_pipeline(&mut self) -> Result<(), SequencerAdminAPIError> { + info!(target: "sequencer", "Resetting derivation pipeline"); + self.block_building_client.reset_engine_forkchoice().await.map_err(|e| { + error!(target: "sequencer", err=?e, "Failed to reset engine forkchoice"); + SequencerAdminAPIError::RequestError(format!("Failed to reset engine: {e}")) + }) + } } From fcf289d12e0051df79607a4407c26fc7488f5df1 Mon Sep 17 00:00:00 2001 From: Mablr <59505383+mablr@users.noreply.github.com> Date: Tue, 2 Dec 2025 09:04:25 +0100 Subject: [PATCH 2/2] Add tests for reset_derivation_pipeline functionality --- .../actors/sequencer/admin_api_impl_test.rs | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs b/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs index 3aa6a1f32c..e25a6316ce 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_impl_test.rs @@ -318,6 +318,56 @@ async fn test_override_leader( } } +#[rstest] +#[tokio::test] +async fn test_reset_derivation_pipeline_success(#[values(true, false)] via_channel: bool) { + let mut client = MockBlockBuildingClient::new(); + client.expect_reset_engine_forkchoice().times(1).return_once(|| Ok(())); + + let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + + let result = async { + match via_channel { + false => actor.reset_derivation_pipeline().await, + true => { + let (tx, rx) = oneshot::channel(); + actor.handle_admin_query(SequencerAdminQuery::ResetDerivationPipeline(tx)).await; + rx.await.unwrap() + } + } + } + .await; + + assert!(result.is_ok()); +} + +#[rstest] +#[tokio::test] +async fn test_reset_derivation_pipeline_error(#[values(true, false)] via_channel: bool) { + let mut client = MockBlockBuildingClient::new(); + client + .expect_reset_engine_forkchoice() + .times(1) + .return_once(|| Err(BlockEngineError::RequestError("reset failed".to_string()))); + + let mut actor = test_builder().with_block_building_client(client).build().unwrap(); + + let result = async { + match via_channel { + false => actor.reset_derivation_pipeline().await, + true => { + let (tx, rx) = oneshot::channel(); + actor.handle_admin_query(SequencerAdminQuery::ResetDerivationPipeline(tx)).await; + rx.await.unwrap() + } + } + } + .await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Failed to reset engine")); +} + #[rstest] #[tokio::test] async fn test_handle_admin_query_resilient_to_dropped_receiver() { @@ -330,6 +380,7 @@ async fn test_handle_admin_query_resilient_to_dropped_receiver() { }; let mut client = MockBlockBuildingClient::new(); client.expect_get_unsafe_head().times(1).returning(move || Ok(unsafe_head)); + client.expect_reset_engine_forkchoice().times(1).returning(|| Ok(())); let mut actor = test_builder() .with_conductor(conductor) @@ -373,6 +424,11 @@ async fn test_handle_admin_query_resilient_to_dropped_receiver() { let (tx, _rx) = oneshot::channel(); queries.push(SequencerAdminQuery::OverrideLeader(tx)); } + { + // immediately drop receiver + let (tx, _rx) = oneshot::channel(); + queries.push(SequencerAdminQuery::ResetDerivationPipeline(tx)); + } // None of these should fail even if the receiver is dropped for query in queries {