Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/supervisor/core/src/chain_processor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ where
pub async fn start(&mut self) -> Result<(), ChainProcessorError> {
let mut handle_guard = self.task_handle.lock().await;
if handle_guard.is_some() {
warn!(target: "chain_processor", "ChainProcessor is already running");
warn!(target: "chain_processor", chain_id = %self.chain_id, "ChainProcessor is already running");
return Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions crates/supervisor/core/src/chain_processor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ where
/// Creates a new [`ChainProcessorTask`].
pub fn new(
rollup_config: RollupConfig,
chain_id: u64,
chain_id: ChainId,
managed_node: Arc<P>,
state_manager: Arc<W>,
cancel_token: CancellationToken,
event_rx: mpsc::Receiver<ChainEvent>,
) -> Self {
let log_indexer = LogIndexer::new(managed_node.clone(), state_manager.clone());
let log_indexer = LogIndexer::new(chain_id, managed_node.clone(), state_manager.clone());
Self {
rollup_config,
chain_id,
Expand Down
30 changes: 19 additions & 11 deletions crates/supervisor/core/src/logindexer/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ use crate::{
logindexer::{log_to_log_hash, payload_hash_to_log_hash},
syncnode::{BlockProvider, ManagedNodeError},
};
use alloy_primitives::ChainId;
use kona_interop::parse_log_to_executing_message;
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{LogStorageReader, LogStorageWriter, StorageError};
use kona_supervisor_types::{ExecutingMessage, Log};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::{debug, error};

/// The [`LogIndexer`] is responsible for processing L2 receipts, extracting [`ExecutingMessage`]s,
/// and persisting them to the state manager.
#[derive(Debug)]
pub struct LogIndexer<P, S> {
/// The chain ID of the rollup.
chain_id: ChainId,
/// Component that provides receipts for a given block hash.
pub block_provider: Arc<P>,
block_provider: Arc<P>,
/// Component that persists parsed log entries to storage.
pub log_storage: Arc<S>,
log_storage: Arc<S>,
/// Protects concurrent catch-up
is_catch_up_running: Mutex<bool>,
}
Expand All @@ -34,8 +37,8 @@ where
/// - `block_provider`: Shared reference to a component capable of fetching block ref and
/// receipts.
/// - `log_storage`: Shared reference to the storage layer for persisting parsed logs.
pub fn new(block_provider: Arc<P>, log_storage: Arc<S>) -> Self {
Self { block_provider, log_storage, is_catch_up_running: Mutex::new(false) }
pub fn new(chain_id: ChainId, block_provider: Arc<P>, log_storage: Arc<S>) -> Self {
Self { chain_id, block_provider, log_storage, is_catch_up_running: Mutex::new(false) }
}

/// Asynchronously initiates a background task to catch up and index logs
Expand All @@ -50,15 +53,20 @@ where
let mut running = self.is_catch_up_running.lock().await;

if *running {
debug!(target: "log_indexer", "Catch-up running log index");
debug!(target: "log_indexer", chain_id = %self.chain_id, "Catch-up running log index");
return;
}

*running = true;
drop(running); // release the lock while the job runs

if let Err(err) = self.index_log_upto(&block).await {
tracing::error!(target: "log_indexer", %err, "Log indexer catch-up failed");
error!(
target: "log_indexer",
chain_id = %self.chain_id,
%err,
"Log indexer catch-up failed"
);
}

let mut running = self.is_catch_up_running.lock().await;
Expand Down Expand Up @@ -227,7 +235,7 @@ mod tests {
.withf(|block, logs| block.number == 1 && logs.len() == 2)
.returning(|_, _| Ok(()));

let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db));
let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db));

let result = log_indexer.process_and_store_logs(&block_info).await;
assert!(result.is_ok());
Expand Down Expand Up @@ -257,7 +265,7 @@ mod tests {
.withf(|block, logs| block.number == 2 && logs.is_empty())
.returning(|_, _| Ok(()));

let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db));
let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db));

let result = log_indexer.process_and_store_logs(&block_info).await;
assert!(result.is_ok());
Expand All @@ -282,7 +290,7 @@ mod tests {

let mock_db = MockDb::new(); // No call expected

let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db));
let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db));

let result = log_indexer.process_and_store_logs(&block_info).await;
assert!(result.is_err());
Expand Down Expand Up @@ -320,7 +328,7 @@ mod tests {

mock_db.expect_store_block_logs().times(5).returning(move |_, _| Ok(()));

let indexer = Arc::new(LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db)));
let indexer = Arc::new(LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db)));

indexer.clone().sync_logs(target_block);

Expand Down
56 changes: 33 additions & 23 deletions crates/supervisor/core/src/syncnode/resetter.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::{ManagedNodeClient, ManagedNodeError};
use alloy_eips::BlockNumHash;
use alloy_primitives::ChainId;
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, StorageError};
use kona_supervisor_types::SuperHead;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info};
use tracing::{debug, error, info};

#[derive(Debug)]
pub(super) struct Resetter<DB, C> {
Expand All @@ -26,27 +27,30 @@ where

/// Resets the node using the latest super head.
pub(crate) async fn reset(&self) -> Result<(), ManagedNodeError> {
// get the chain ID to log it, this is useful for debugging
// no performance impact as it is cached in the client
let chain_id = self.client.chain_id().await?;
let _guard = self.reset_guard.lock().await;

info!(target: "resetter", "Resetting the node");
debug!(target: "resetter", %chain_id, "Resetting the node");

let local_safe = match self.get_latest_valid_local_safe().await {
let local_safe = match self.get_latest_valid_local_safe(chain_id).await {
Ok(block) => block,
// todo: require refactor and corner case handling
Err(ManagedNodeError::StorageError(StorageError::DatabaseNotInitialised)) => {
self.reset_pre_interop().await?;
self.reset_pre_interop(chain_id).await?;
return Ok(());
}
Err(err) => {
error!(target: "resetter", %err, "Failed to get latest valid derived block");
error!(target: "resetter", %chain_id, %err, "Failed to get latest valid derived block");
return Err(ManagedNodeError::ResetFailed);
}
};

let SuperHead { cross_unsafe, cross_safe, finalized, .. } = self
.db_provider
.get_super_head()
.inspect_err(|err| error!(target: "resetter", %err, "Failed to get super head"))?;
let SuperHead { cross_unsafe, cross_safe, finalized, .. } =
self.db_provider.get_super_head().inspect_err(
|err| error!(target: "resetter", %chain_id, %err, "Failed to get super head"),
Comment thread
dhyaniarun1993 marked this conversation as resolved.
)?;

// using the local safe block as the local unsafe as well
let local_unsafe = local_safe;
Expand All @@ -67,6 +71,7 @@ where
}

info!(target: "resetter",
%chain_id,
%local_unsafe,
%cross_unsafe,
%local_safe,
Expand All @@ -85,30 +90,30 @@ where
)
.await
.inspect_err(|err| {
error!(target: "resetter", %err, "Failed to reset managed node");
error!(target: "resetter", %chain_id, %err, "Failed to reset managed node");
})?;
Ok(())
}

async fn reset_pre_interop(&self) -> Result<(), ManagedNodeError> {
info!(target: "resetter", "Resetting the node to pre-interop state");
async fn reset_pre_interop(&self, chain_id: ChainId) -> Result<(), ManagedNodeError> {
info!(target: "resetter", %chain_id, "Resetting the node to pre-interop state");

self.client.reset_pre_interop().await.inspect_err(|err| {
error!(target: "resetter", %err, "Failed to reset managed node to pre-interop state");
error!(target: "resetter", %chain_id, %err, "Failed to reset managed node to pre-interop state");
})?;
Ok(())
}

async fn get_latest_valid_local_safe(&self) -> Result<BlockInfo, ManagedNodeError> {
let latest_derivation_state = self.db_provider.latest_derivation_state().inspect_err(
|err| error!(target: "resetter", %err, "Failed to get latest derivation state"),
)?;

async fn get_latest_valid_local_safe(
&self,
chain_id: ChainId,
) -> Result<BlockInfo, ManagedNodeError> {
let latest_derivation_state = self.db_provider.latest_derivation_state()?;
let mut local_safe = latest_derivation_state.derived;

loop {
let node_block = self.client.block_ref_by_number(local_safe.number).await.inspect_err(
|err| error!(target: "resetter", %err, "Failed to get block by number"),
|err| error!(target: "resetter", %chain_id, %err, "Failed to get block by number"),
)?;

// If the local safe block matches the node block, we can return the super
Expand All @@ -122,7 +127,7 @@ where
let source_block = self
.db_provider
.derived_to_source(local_safe.id())
.inspect_err(|err| error!(target: "resetter", %err, "Failed to get source block for the local safe head ref"))?;
.inspect_err(|err| error!(target: "resetter", %chain_id, %err, "Failed to get source block for the local safe head ref"))?;

// Get the previous source block id
let prev_source_id =
Expand All @@ -131,7 +136,7 @@ where
// If the previous source block id is 0, we cannot reset further. This should not happen
// in prod, added for safety during dev environment.
if prev_source_id.number == 0 {
error!(target: "resetter", "Source block number is 0, cannot reset further");
error!(target: "resetter", %chain_id, "Source block number is 0, cannot reset further");
return Err(ManagedNodeError::ResetFailed);
}

Expand All @@ -143,7 +148,7 @@ where
.db_provider
.latest_derived_block_at_source(prev_source_id)
.inspect_err(|err| {
error!(target: "resetter", %err, "Failed to get latest derived block for the previous source block")
error!(target: "resetter", %chain_id, %err, "Failed to get latest derived block for the previous source block")
})?;
}
}
Expand Down Expand Up @@ -228,6 +233,7 @@ mod tests {
db.expect_get_super_head().returning(move || Ok(super_head));

let mut client = MockClient::new();
client.expect_chain_id().returning(move || Ok(1));
client.expect_block_ref_by_number().returning(move |_| Ok(super_head.local_safe.unwrap()));

client.expect_reset().returning(|_, _, _, _, _| Ok(()));
Expand All @@ -242,7 +248,8 @@ mod tests {
let mut db = MockDb::new();
db.expect_latest_derivation_state().returning(|| Err(StorageError::LockPoisoned));

let client = MockClient::new();
let mut client = MockClient::new();
client.expect_chain_id().returning(move || Ok(1));

let resetter = Resetter::new(Arc::new(client), Arc::new(db));

Expand All @@ -261,6 +268,7 @@ mod tests {
})
});
let mut client = MockClient::new();
client.expect_chain_id().returning(move || Ok(1));
client
.expect_block_ref_by_number()
.returning(|_| Err(ClientError::Authentication(AuthenticationError::InvalidHeader)));
Expand Down Expand Up @@ -296,6 +304,7 @@ mod tests {
.returning(move |_| Ok(last_valid_derived_block));

let mut client = MockClient::new();
client.expect_chain_id().returning(move || Ok(1));
// Return a block that does not match local_safe
client
.expect_block_ref_by_number()
Expand Down Expand Up @@ -330,6 +339,7 @@ mod tests {
db.expect_get_super_head().returning(move || Ok(super_head));

let mut client = MockClient::new();
client.expect_chain_id().returning(move || Ok(1));
client.expect_block_ref_by_number().returning(move |_| Ok(super_head.local_safe.unwrap()));
client.expect_reset().returning(|_, _, _, _, _| {
Err(ClientError::Authentication(AuthenticationError::InvalidJwt))
Expand Down
Loading