Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions crates/supervisor/core/src/chain_processor/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{ChainProcessorError, ChainProcessorTask};
use crate::{event::ChainEvent, syncnode::ManagedNodeProvider};
use crate::{config::RollupConfig, event::ChainEvent, syncnode::ManagedNodeProvider};
use alloy_primitives::ChainId;
use kona_supervisor_storage::{
DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter,
Expand All @@ -18,6 +18,9 @@ use tracing::warn;
// chain processor will support multiple managed nodes in the future.
#[derive(Debug)]
pub struct ChainProcessor<P, W> {
// The rollup configuration for the chain
rollup_config: RollupConfig,

// The chainId that this processor is associated with
chain_id: ChainId,

Expand Down Expand Up @@ -51,13 +54,15 @@ where
{
/// Creates a new instance of [`ChainProcessor`].
pub fn new(
rollup_config: RollupConfig,
chain_id: ChainId,
managed_node: Arc<P>,
state_manager: Arc<W>,
cancel_token: CancellationToken,
) -> Self {
// todo: validate chain_id against managed_node
Self {
rollup_config,
chain_id,
event_tx: None,
metrics_enabled: None,
Expand Down Expand Up @@ -94,11 +99,12 @@ where
}

// todo: figure out value for buffer size
let (event_tx, event_rx) = mpsc::channel::<ChainEvent>(100);
let (event_tx, event_rx) = mpsc::channel::<ChainEvent>(1000);
self.event_tx = Some(event_tx.clone());
self.managed_node.start_subscription(event_tx.clone()).await?;

let mut task = ChainProcessorTask::new(
self.rollup_config.clone(),
self.chain_id,
self.managed_node.clone(),
self.state_manager.clone(),
Expand Down Expand Up @@ -235,6 +241,11 @@ mod tests {
pub Db {}

impl LogStorageWriter for Db {
fn initialise_log_storage(
&self,
block: BlockInfo,
) -> Result<(), StorageError>;

fn store_block_logs(
&self,
block: &BlockInfo,
Expand All @@ -250,6 +261,11 @@ mod tests {
}

impl DerivationStorageWriter for Db {
fn initialise_derivation_storage(
&self,
incoming_pair: DerivedRefPair,
) -> Result<(), StorageError>;

fn save_derived_block(
&self,
incoming_pair: DerivedRefPair,
Expand Down Expand Up @@ -285,8 +301,14 @@ mod tests {
let storage = Arc::new(MockDb::new());
let cancel_token = CancellationToken::new();

let mut processor =
ChainProcessor::new(1, Arc::clone(&mock_node), Arc::clone(&storage), cancel_token);
let rollup_config = RollupConfig::default();
let mut processor = ChainProcessor::new(
rollup_config,
1,
Arc::clone(&mock_node),
Arc::clone(&storage),
cancel_token,
);

assert!(processor.start().await.is_ok());

Expand Down
92 changes: 82 additions & 10 deletions crates/supervisor/core/src/chain_processor/task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::Metrics;
use crate::{ChainProcessorError, LogIndexer, event::ChainEvent, syncnode::ManagedNodeProvider};
use crate::{
ChainProcessorError, LogIndexer, config::RollupConfig, event::ChainEvent,
syncnode::ManagedNodeProvider,
};
use alloy_primitives::ChainId;
use kona_interop::{BlockReplacement, DerivedRefPair};
use kona_protocol::BlockInfo;
Expand All @@ -15,6 +18,7 @@ use tracing::{debug, error, info};
/// It listens for events emitted by the managed node and handles them accordingly.
#[derive(Debug)]
pub struct ChainProcessorTask<P, W> {
_rollup_config: RollupConfig,
chain_id: ChainId,
metrics_enabled: Option<bool>,

Expand All @@ -41,6 +45,7 @@ where
{
/// Creates a new [`ChainProcessorTask`].
pub fn new(
rollup_config: RollupConfig,
chain_id: u64,
managed_node: Arc<P>,
state_manager: Arc<W>,
Expand All @@ -49,6 +54,7 @@ where
) -> Self {
let log_indexer = LogIndexer::new(managed_node.clone(), state_manager.clone());
Self {
_rollup_config: rollup_config,
chain_id,
metrics_enabled: None,
cancel_token,
Expand Down Expand Up @@ -288,7 +294,7 @@ where
);
match self.state_manager.save_source_block(origin) {
Ok(_) => Ok(()),
Err(StorageError::BlockOutOfOrder) => {
Err(StorageError::BlockOutOfOrder | StorageError::ConflictError(_)) => {
error!(
target: "chain_processor",
chain_id = self.chain_id,
Expand Down Expand Up @@ -322,7 +328,7 @@ where
);
match self.state_manager.save_derived_block(derived_ref_pair) {
Ok(_) => Ok(derived_ref_pair.derived),
Err(StorageError::BlockOutOfOrder) => {
Err(StorageError::BlockOutOfOrder | StorageError::ConflictError(_)) => {
error!(
target: "chain_processor",
chain_id = self.chain_id,
Expand Down Expand Up @@ -488,6 +494,11 @@ mod tests {
pub Db {}

impl LogStorageWriter for Db {
fn initialise_log_storage(
&self,
block: BlockInfo,
) -> Result<(), StorageError>;

fn store_block_logs(
&self,
block: &BlockInfo,
Expand All @@ -503,6 +514,11 @@ mod tests {
}

impl DerivationStorageWriter for Db {
fn initialise_derivation_storage(
&self,
incoming_pair: DerivedRefPair,
) -> Result<(), StorageError>;

fn save_derived_block(
&self,
incoming_pair: DerivedRefPair,
Expand Down Expand Up @@ -552,7 +568,15 @@ mod tests {
let cancel_token = CancellationToken::new();
let (tx, rx) = mpsc::channel(10);

let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx);
let rollup_config = RollupConfig::default();
let task = ChainProcessorTask::new(
rollup_config,
1,
managed_node,
writer,
cancel_token.clone(),
rx,
);

tx.send(ChainEvent::UnsafeBlock { block }).await.unwrap();

Expand Down Expand Up @@ -597,7 +621,15 @@ mod tests {
let cancel_token = CancellationToken::new();
let (tx, rx) = mpsc::channel(10);

let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx);
let rollup_config = RollupConfig::default();
let task = ChainProcessorTask::new(
rollup_config,
1,
managed_node,
writer,
cancel_token.clone(),
rx,
);

// Send unsafe block event
tx.send(ChainEvent::DerivedBlock { derived_ref_pair: block_pair }).await.unwrap();
Expand Down Expand Up @@ -632,7 +664,15 @@ mod tests {
let cancel_token = CancellationToken::new();
let (tx, rx) = mpsc::channel(10);

let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx);
let rollup_config = RollupConfig::default();
let task = ChainProcessorTask::new(
rollup_config,
1,
managed_node,
writer,
cancel_token.clone(),
rx,
);

// Send derivation origin update event
tx.send(ChainEvent::DerivationOriginUpdate { origin }).await.unwrap();
Expand Down Expand Up @@ -678,7 +718,15 @@ mod tests {
let cancel_token = CancellationToken::new();
let (tx, rx) = mpsc::channel(10);

let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx);
let rollup_config = RollupConfig::default();
let task = ChainProcessorTask::new(
rollup_config,
1,
managed_node,
writer,
cancel_token.clone(),
rx,
);

// Send FinalizedSourceUpdate event
tx.send(ChainEvent::FinalizedSourceUpdate { finalized_source_block }).await.unwrap();
Expand Down Expand Up @@ -715,7 +763,15 @@ mod tests {
let cancel_token = CancellationToken::new();
let (tx, rx) = mpsc::channel(10);

let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx);
let rollup_config = RollupConfig::default();
let task = ChainProcessorTask::new(
rollup_config,
1,
managed_node,
writer,
cancel_token.clone(),
rx,
);

// Send FinalizedSourceUpdate event
tx.send(ChainEvent::FinalizedSourceUpdate { finalized_source_block }).await.unwrap();
Expand Down Expand Up @@ -749,7 +805,15 @@ mod tests {
let cancel_token = CancellationToken::new();
let (tx, rx) = mpsc::channel(10);

let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx);
let rollup_config = RollupConfig::default();
let task = ChainProcessorTask::new(
rollup_config,
1,
managed_node,
writer,
cancel_token.clone(),
rx,
);

// Send derivation origin update event
tx.send(ChainEvent::CrossUnsafeUpdate { block }).await.unwrap();
Expand Down Expand Up @@ -786,7 +850,15 @@ mod tests {
let cancel_token = CancellationToken::new();
let (tx, rx) = mpsc::channel(10);

let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx);
let rollup_config = RollupConfig::default();
let task = ChainProcessorTask::new(
rollup_config,
1,
managed_node,
writer,
cancel_token.clone(),
rx,
);

// Send derivation origin update event
tx.send(ChainEvent::CrossSafeUpdate {
Expand Down
4 changes: 2 additions & 2 deletions crates/supervisor/core/src/config/rollup_config_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::HashMap;
use crate::SupervisorError;

/// Genesis provides the genesis information relevant for Interop.
#[derive(Debug, Clone)]
#[derive(Debug, Default, Clone)]
pub struct Genesis {
/// The L1 [`BlockInfo`] that the rollup starts after.
pub l1: BlockInfo,
Expand Down Expand Up @@ -36,7 +36,7 @@ impl Genesis {
}

/// RollupConfig contains the configuration for the Optimism rollup.
#[derive(Debug, Clone)]
#[derive(Debug, Default, Clone)]
pub struct RollupConfig {
/// Genesis anchor information for the rollup.
pub genesis: Genesis,
Expand Down
1 change: 1 addition & 0 deletions crates/supervisor/core/src/logindexer/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ mod tests {
pub Db {}

impl LogStorageWriter for Db {
fn initialise_log_storage(&self, _block: BlockInfo) -> Result<(), StorageError>;
fn store_block_logs(&self, block: &BlockInfo, logs: Vec<Log>) -> Result<(), StorageError>;
}

Expand Down
22 changes: 17 additions & 5 deletions crates/supervisor/core/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
};
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{
ChainDb, ChainDbFactory, DerivationStorageReader, FinalizedL1Storage, HeadRefStorageReader,
LogStorageReader,
ChainDb, ChainDbFactory, DerivationStorageReader, DerivationStorageWriter, FinalizedL1Storage,
HeadRefStorageReader, LogStorageReader, LogStorageWriter,
};
use kona_supervisor_types::{SuperHead, parse_access_list};
use op_alloy_rpc_types::SuperchainDAError;
Expand Down Expand Up @@ -142,7 +142,9 @@
for (chain_id, config) in self.config.rollup_config_set.rollups.iter() {
// Initialise the database for each chain.
let db = self.database_factory.get_or_create_db(*chain_id)?;
db.initialise(config.genesis.get_anchor())?;
let anchor = config.genesis.get_anchor();
db.initialise_log_storage(anchor.derived)?;
db.initialise_derivation_storage(anchor)?;

Check warning on line 147 in crates/supervisor/core/src/supervisor.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/supervisor.rs#L145-L147

Added lines #L145 - L147 were not covered by tests
info!(target: "supervisor_service", chain_id, "Database initialized successfully");
}
Ok(())
Expand All @@ -159,9 +161,19 @@
chain_id
)))?;

let rollup_config =
self.config.rollup_config_set.get(*chain_id).ok_or(SupervisorError::Initialise(
format!("no rollup config found for chain {}", chain_id),
))?;

Check warning on line 167 in crates/supervisor/core/src/supervisor.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/supervisor.rs#L164-L167

Added lines #L164 - L167 were not covered by tests

// initialise chain processor for the chain.
let mut processor =
ChainProcessor::new(*chain_id, managed_node.clone(), db, self.cancel_token.clone());
let mut processor = ChainProcessor::new(
rollup_config.clone(),
*chain_id,
managed_node.clone(),
db,
self.cancel_token.clone(),
);

Check warning on line 176 in crates/supervisor/core/src/supervisor.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/supervisor.rs#L170-L176

Added lines #L170 - L176 were not covered by tests

// todo: enable metrics only if configured
processor = processor.with_metrics();
Expand Down
Loading