Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/supervisor/core/src/l1_watcher/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::SupervisorError;
use crate::{SupervisorError, syncnode::ManagedNodeController};
use alloy_primitives::B256;
use alloy_transport::mock::*;
use kona_supervisor_storage::{ChainDb, FinalizedL1Storage, StorageError};
Expand Down Expand Up @@ -242,7 +242,8 @@ mod tests {

fn mock_reorg_handler() -> ReorgHandler<ChainDb> {
let chain_dbs_map: HashMap<ChainId, Arc<ChainDb>> = HashMap::new();
ReorgHandler::new(mock_rpc_client(), chain_dbs_map)
let managed_nodes: HashMap<ChainId, Arc<dyn ManagedNodeController>> = HashMap::new();
ReorgHandler::new(mock_rpc_client(), chain_dbs_map, managed_nodes)
}

#[tokio::test]
Expand Down
17 changes: 14 additions & 3 deletions crates/supervisor/core/src/reorg/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{SupervisorError, reorg::task::ReorgTask};
use crate::{SupervisorError, reorg::task::ReorgTask, syncnode::ManagedNodeController};
use alloy_primitives::ChainId;
use alloy_rpc_client::RpcClient;
use derive_more::Constructor;
Expand All @@ -15,6 +15,8 @@ pub struct ReorgHandler<DB> {
rpc_client: RpcClient,
/// Per chain dbs.
chain_dbs: HashMap<ChainId, Arc<DB>>,
/// Per chain managed nodes
managed_nodes: HashMap<ChainId, Arc<dyn ManagedNodeController>>,
}

impl<DB> ReorgHandler<DB>
Expand All @@ -32,8 +34,17 @@ where
let mut handles = Vec::with_capacity(self.chain_dbs.len());

for (chain_id, chain_db) in &self.chain_dbs {
let reorg_task =
ReorgTask::new(*chain_id, Arc::clone(chain_db), self.rpc_client.clone());
let managed_node = self.managed_nodes.get(chain_id).ok_or(
SupervisorError::Initialise("no managed node found for chain".to_string()),
)?;

let reorg_task = ReorgTask::new(
*chain_id,
Arc::clone(chain_db),
self.rpc_client.clone(),
Arc::clone(managed_node),
);

let handle = tokio::spawn(async move { reorg_task.process_chain_reorg().await });
handles.push(handle);
}
Expand Down
47 changes: 42 additions & 5 deletions crates/supervisor/core/src/reorg/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::SupervisorError;
use crate::{SupervisorError, syncnode::ManagedNodeController};
use alloy_eips::BlockNumHash;
use alloy_primitives::{B256, ChainId};
use alloy_rpc_client::RpcClient;
Expand All @@ -14,6 +14,7 @@ pub(crate) struct ReorgTask<DB> {
chain_id: ChainId,
db: Arc<DB>,
rpc_client: RpcClient,
managed_node: Arc<dyn ManagedNodeController>,
}

impl<DB> ReorgTask<DB>
Expand Down Expand Up @@ -44,6 +45,23 @@ where
);
})?;

trace!(
target: "supervisor::reorg_handler",
chain_id = %self.chain_id,
"Calling resetter to reset the node after reorg"
);

// Reset the node after rewinding the DB.
self.managed_node.reset().await.map_err(|err| {
warn!(
target: "supervisor::reorg_handler",
chain_id = %self.chain_id,
%err,
"Failed to reset node after reorg"
);
SupervisorError::from(err)
})?;

Ok(())
}

Expand Down Expand Up @@ -129,14 +147,16 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::syncnode::{ManagedNodeController, ManagedNodeError};
use alloy_rpc_types_eth::Header;
use alloy_transport::mock::*;
use async_trait::async_trait;
use kona_interop::{DerivedRefPair, SafetyLevel};
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{
DerivationStorageReader, HeadRefStorageReader, LogStorageReader, StorageError,
};
use kona_supervisor_types::{Log, SuperHead};
use kona_supervisor_types::{BlockSeal, Log, SuperHead};
use mockall::mock;

mock!(
Expand Down Expand Up @@ -172,6 +192,20 @@ mod tests {
pub chain_db {}
);

mock! (
#[derive(Debug)]
pub ManagedNode {}

#[async_trait]
impl ManagedNodeController for ManagedNode {
async fn reset(&self) -> Result<(), ManagedNodeError>;
async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ManagedNodeError>;
async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ManagedNodeError>;
async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ManagedNodeError>;
async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>;
}
);

#[tokio::test]
async fn test_find_rewind_target_without_reorg() {
let mut mock_db = MockDb::new();
Expand Down Expand Up @@ -208,7 +242,8 @@ mod tests {
// Mock RPC response
asserter.push_success(&latest_source);

let reorg_task = ReorgTask::new(1, Arc::new(mock_db), rpc_client);
let managed_node = Arc::new(MockManagedNode::new());
let reorg_task = ReorgTask::new(1, Arc::new(mock_db), rpc_client, managed_node);
let rewind_target = reorg_task.find_rewind_target().await;

// Should succeed since the latest source block is still canonical
Expand Down Expand Up @@ -342,7 +377,8 @@ mod tests {
// Finally returning the correct block
asserter.push_success(&finalized_source);

let reorg_task = ReorgTask::new(1, Arc::new(mock_db), rpc_client);
let managed_node = Arc::new(MockManagedNode::new());
let reorg_task = ReorgTask::new(1, Arc::new(mock_db), rpc_client, managed_node);
let rewind_target = reorg_task.find_rewind_target().await;

// Should succeed since the latest source block is still canonical
Expand Down Expand Up @@ -389,7 +425,8 @@ mod tests {
asserter.push_success(&canonical_block);
asserter.push_success(&non_canonical_block);

let reorg_task = ReorgTask::new(1, Arc::new(MockDb::new()), rpc_client);
let managed_node = Arc::new(MockManagedNode::new());
let reorg_task = ReorgTask::new(1, Arc::new(MockDb::new()), rpc_client, managed_node);

let result = reorg_task.is_block_canonical(100, canonical_hash).await;
assert!(result.is_ok());
Expand Down
14 changes: 12 additions & 2 deletions crates/supervisor/core/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use crate::{
l1_watcher::L1Watcher,
reorg::ReorgHandler,
safety_checker::{CrossSafePromoter, CrossUnsafePromoter},
syncnode::{Client, ManagedNode, ManagedNodeClient, ManagedNodeDataProvider},
syncnode::{
Client, ManagedNode, ManagedNodeClient, ManagedNodeController, ManagedNodeDataProvider,
},
};

/// Defines the service for the Supervisor core logic.
Expand Down Expand Up @@ -293,12 +295,20 @@ impl Supervisor {
.map(|chain_id| (*chain_id, self.database_factory.get_db(*chain_id).unwrap()))
.collect();

let managed_nodes = self
.managed_nodes
.iter()
.map(|(chain_id, managed_node)| {
(*chain_id, managed_node.clone() as Arc<dyn ManagedNodeController>)
})
.collect();

let l1_watcher = L1Watcher::new(
l1_rpc.clone(),
self.database_factory.clone(),
senders,
self.cancel_token.clone(),
ReorgHandler::new(l1_rpc, chain_dbs_map),
ReorgHandler::new(l1_rpc, chain_dbs_map, managed_nodes),
);

tokio::spawn(async move {
Expand Down