Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/node/service/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ mod sequencer;
pub use sequencer::{
Conductor, ConductorClient, ConductorError, DelayedL1OriginSelectorProvider, L1OriginSelector,
L1OriginSelectorError, L1OriginSelectorProvider, OriginSelector, QueuedSequencerAdminAPIClient,
SequencerActor, SequencerActorBuilder, SequencerActorError, SequencerAdminQuery,
SequencerConfig,
SequencerActor, SequencerActorError, SequencerAdminQuery, SequencerConfig,
};

#[cfg(test)]
Expand Down
52 changes: 26 additions & 26 deletions crates/node/service/src/actors/sequencer/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,32 +89,6 @@ pub struct SequencerActor<
pub unsafe_payload_gossip_client: UnsafePayloadGossipClient_,
}

impl<
AttributesBuilder_,
BlockBuildingClient_,
Conductor_,
OriginSelector_,
UnsafePayloadGossipClient_,
> CancellableContext
for SequencerActor<
AttributesBuilder_,
BlockBuildingClient_,
Conductor_,
OriginSelector_,
UnsafePayloadGossipClient_,
>
where
AttributesBuilder_: AttributesBuilder,
BlockBuildingClient_: BlockBuildingClient,
Conductor_: Conductor,
OriginSelector_: OriginSelector,
UnsafePayloadGossipClient_: UnsafePayloadGossipClient,
{
fn cancelled(&self) -> WaitForCancellationFuture<'_> {
self.cancellation_token.cancelled()
}
}

impl<
AttributesBuilder_,
BlockBuildingClient_,
Expand Down Expand Up @@ -489,6 +463,32 @@ where
}
}

impl<
AttributesBuilder_,
BlockBuildingClient_,
Conductor_,
OriginSelector_,
UnsafePayloadGossipClient_,
> CancellableContext
for SequencerActor<
AttributesBuilder_,
BlockBuildingClient_,
Conductor_,
OriginSelector_,
UnsafePayloadGossipClient_,
>
where
AttributesBuilder_: AttributesBuilder,
BlockBuildingClient_: BlockBuildingClient,
Conductor_: Conductor,
OriginSelector_: OriginSelector,
UnsafePayloadGossipClient_: UnsafePayloadGossipClient,
{
fn cancelled(&self) -> WaitForCancellationFuture<'_> {
self.cancellation_token.cancelled()
}
}

// Determines whether the provided [`SealTaskError`] is fatal for the sequencer.
//
// NB: We could use `err.severity()`, but that gives EngineActor control over this classification.
Expand Down
90 changes: 46 additions & 44 deletions crates/node/service/src/actors/sequencer/admin_api_impl_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
BlockEngineError, ConductorError, SequencerActorBuilder, SequencerAdminQuery,
BlockEngineError, ConductorError, SequencerActor, SequencerAdminQuery,
actors::{
MockBlockBuildingClient, MockConductor, MockOriginSelector, MockUnsafePayloadGossipClient,
},
Expand All @@ -16,24 +16,26 @@ use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;

// Returns a test SequencerActorBuilder with mocks that can be used or overridden.
fn test_builder() -> SequencerActorBuilder<
fn test_actor() -> SequencerActor<
TestAttributesBuilder,
MockBlockBuildingClient,
MockConductor,
MockOriginSelector,
MockUnsafePayloadGossipClient,
> {
let (_admin_api_tx, admin_api_rx) = mpsc::channel(20);
SequencerActorBuilder::new()
.with_active_status(true)
.with_admin_api_receiver(admin_api_rx)
.with_attributes_builder(TestAttributesBuilder { attributes: vec![] })
.with_block_building_client(MockBlockBuildingClient::new())
.with_cancellation_token(CancellationToken::new())
.with_origin_selector(MockOriginSelector::new())
.with_recovery_mode_status(false)
.with_rollup_config(Arc::new(RollupConfig::default()))
.with_unsafe_payload_gossip_client(MockUnsafePayloadGossipClient::new())
SequencerActor {
admin_api_rx,
attributes_builder: TestAttributesBuilder { attributes: vec![] },
block_building_client: MockBlockBuildingClient::new(),
cancellation_token: CancellationToken::new(),
conductor: None,
is_active: true,
in_recovery_mode: false,
origin_selector: MockOriginSelector::new(),
rollup_config: Arc::new(RollupConfig::default()),
unsafe_payload_gossip_client: MockUnsafePayloadGossipClient::new(),
}
}

#[rstest]
Expand All @@ -42,7 +44,8 @@ async fn test_is_sequencer_active(
#[values(true, false)] active: bool,
#[values(true, false)] via_channel: bool,
) {
let mut actor = test_builder().with_active_status(active).build().unwrap();
let mut actor = test_actor();
actor.is_active = active;

let result = async {
match via_channel {
Expand All @@ -66,15 +69,10 @@ async fn test_is_conductor_enabled(
#[values(true, false)] conductor_exists: bool,
#[values(true, false)] via_channel: bool,
) {
let mut actor = {
if conductor_exists {
test_builder().with_conductor(MockConductor::new())
} else {
test_builder()
}
}
.build()
.unwrap();
let mut actor = test_actor();
if conductor_exists {
actor.conductor = Some(MockConductor::new())
};

let result = async {
match via_channel {
Expand All @@ -98,7 +96,8 @@ async fn test_in_recovery_mode(
#[values(true, false)] recovery_mode: bool,
#[values(true, false)] via_channel: bool,
) {
let mut actor = test_builder().with_recovery_mode_status(recovery_mode).build().unwrap();
let mut actor = test_actor();
actor.in_recovery_mode = recovery_mode;

let result = async {
match via_channel {
Expand All @@ -122,7 +121,8 @@ async fn test_start_sequencer(
#[values(true, false)] already_started: bool,
#[values(true, false)] via_channel: bool,
) {
let mut actor = test_builder().with_active_status(already_started).build().unwrap();
let mut actor = test_actor();
actor.is_active = already_started;

// verify starting state
let result = actor.is_sequencer_active().await;
Expand Down Expand Up @@ -164,11 +164,9 @@ async fn test_stop_sequencer_success(
let mut client = MockBlockBuildingClient::new();
client.expect_get_unsafe_head().times(1).return_once(move || Ok(unsafe_head));

let mut actor = test_builder()
.with_block_building_client(client)
.with_active_status(!already_stopped)
.build()
.unwrap();
let mut actor = test_actor();
actor.block_building_client = client;
actor.is_active = !already_stopped;

// verify starting state
let result = actor.is_sequencer_active().await;
Expand Down Expand Up @@ -205,7 +203,8 @@ async fn test_stop_sequencer_error_fetching_unsafe_head(#[values(true, false)] v
.times(1)
.return_once(|| Err(BlockEngineError::RequestError("whoops!".to_string())));

let mut actor = test_builder().with_block_building_client(client).build().unwrap();
let mut actor = test_actor();
actor.block_building_client = client;

let result = async {
match via_channel {
Expand Down Expand Up @@ -234,7 +233,8 @@ async fn test_set_recovery_mode(
#[values(true, false)] mode_to_set: bool,
#[values(true, false)] via_channel: bool,
) {
let mut actor = test_builder().with_recovery_mode_status(starting_mode).build().unwrap();
let mut actor = test_actor();
actor.in_recovery_mode = starting_mode;

// verify starting state
let result = actor.in_recovery_mode().await;
Expand Down Expand Up @@ -277,21 +277,23 @@ async fn test_override_leader(
let mut actor = {
// wire up conductor absence/presence and response error/success
if !conductor_configured {
test_builder()
test_actor()
} else if conductor_error {
let mut conductor = MockConductor::new();
conductor.expect_override_leader().times(1).return_once(move || {
Err(ConductorError::Rpc(RpcError::local_usage_str(conductor_error_string)))
});
test_builder().with_conductor(conductor)
let mut actor = test_actor();
actor.conductor = Some(conductor);
actor
} else {
let mut conductor = MockConductor::new();
conductor.expect_override_leader().times(1).return_once(|| Ok(()));
test_builder().with_conductor(conductor)
let mut actor = test_actor();
actor.conductor = Some(conductor);
actor
}
}
.build()
.unwrap();
};

// call to override leader
let result = async {
Expand Down Expand Up @@ -324,7 +326,8 @@ async fn test_reset_derivation_pipeline_success(#[values(true, false)] via_chann
let mut client = MockBlockBuildingClient::new();
client.expect_reset_engine_forkchoice().times(1).return_once(|| Ok(()));

let mut actor = test_builder().with_block_building_client(client).build().unwrap();
let mut actor = test_actor();
actor.block_building_client = client;

let result = async {
match via_channel {
Expand All @@ -350,7 +353,8 @@ async fn test_reset_derivation_pipeline_error(#[values(true, false)] via_channel
.times(1)
.return_once(|| Err(BlockEngineError::RequestError("reset failed".to_string())));

let mut actor = test_builder().with_block_building_client(client).build().unwrap();
let mut actor = test_actor();
actor.block_building_client = client;

let result = async {
match via_channel {
Expand Down Expand Up @@ -382,11 +386,9 @@ async fn test_handle_admin_query_resilient_to_dropped_receiver() {
client.expect_get_unsafe_head().times(1).returning(move || Ok(unsafe_head));
client.expect_reset_engine_forkchoice().times(1).returning(|| Ok(()));

let mut actor = test_builder()
.with_conductor(conductor)
.with_block_building_client(client)
.build()
.unwrap();
let mut actor = test_actor();
actor.conductor = Some(conductor);
actor.block_building_client = client;

let mut queries: Vec<SequencerAdminQuery> = Vec::new();
{
Expand Down
Loading
Loading