Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
27 changes: 22 additions & 5 deletions crates/supervisor/core/src/chain_processor/chain.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::{ChainProcessorError, ChainProcessorTask};
use crate::{event::ChainEvent, syncnode::ManagedNodeProvider};
use alloy_primitives::ChainId;
use kona_supervisor_storage::{DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter};
use kona_supervisor_storage::{
DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter,
};
use std::sync::Arc;
use tokio::{
sync::{Mutex, mpsc},
Expand Down Expand Up @@ -41,7 +43,11 @@
impl<P, W> ChainProcessor<P, W>
where
P: ManagedNodeProvider + 'static,
W: LogStorageWriter + DerivationStorageWriter + HeadRefStorageWriter + 'static,
W: LogStorageWriter
+ LogStorageReader
+ DerivationStorageWriter
+ HeadRefStorageWriter
+ 'static,
{
/// Creates a new instance of [`ChainProcessor`].
pub fn new(
Expand Down Expand Up @@ -118,8 +124,8 @@
use crate::{
event::ChainEvent,
syncnode::{
ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError, NodeSubscriber,
ReceiptProvider,
BlockProvider, ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError,
NodeSubscriber,
},
};
use alloy_primitives::B256;
Expand Down Expand Up @@ -161,10 +167,14 @@
}

#[async_trait]
impl ReceiptProvider for MockNode {
impl BlockProvider for MockNode {
async fn fetch_receipts(&self, _block_hash: B256) -> Result<Receipts, ManagedNodeError> {
Ok(vec![]) // dummy
}

async fn block_by_number(&self, _number: u64) -> Result<BlockInfo, ManagedNodeError> {
Ok(BlockInfo::default())
}

Check warning on line 177 in crates/supervisor/core/src/chain_processor/chain.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/chain.rs#L175-L177

Added lines #L175 - L177 were not covered by tests
}

#[async_trait]
Expand Down Expand Up @@ -232,6 +242,13 @@
) -> Result<(), StorageError>;
}

impl LogStorageReader for Db {
fn get_block(&self, block_number: u64) -> Result<BlockInfo, StorageError>;
fn get_latest_block(&self) -> Result<BlockInfo, StorageError>;
fn get_log(&self,block_number: u64,log_index: u32) -> Result<Log, StorageError>;
fn get_logs(&self, block_number: u64) -> Result<Vec<Log>, StorageError>;
}

impl DerivationStorageWriter for Db {
fn save_derived_block(
&self,
Expand Down
25 changes: 19 additions & 6 deletions crates/supervisor/core/src/chain_processor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloy_primitives::ChainId;
use kona_interop::{BlockReplacement, DerivedRefPair};
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{
DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError,
DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, StorageError,
};
use std::{fmt::Debug, sync::Arc};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -33,7 +33,11 @@ pub struct ChainProcessorTask<P, W> {
impl<P, W> ChainProcessorTask<P, W>
where
P: ManagedNodeProvider + 'static,
W: LogStorageWriter + DerivationStorageWriter + HeadRefStorageWriter + 'static,
W: LogStorageWriter
+ LogStorageReader
+ DerivationStorageWriter
+ HeadRefStorageWriter
+ 'static,
{
/// Creates a new [`ChainProcessorTask`].
pub fn new(
Expand Down Expand Up @@ -360,7 +364,8 @@ where
"Processing unsafe block"
);

self.log_indexer.process_and_store_logs(&block).await?;
self.log_indexer.clone().sync_logs(block);

Ok(block)
}

Expand Down Expand Up @@ -403,8 +408,8 @@ mod tests {
use crate::{
event::ChainEvent,
syncnode::{
ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError, NodeSubscriber,
ReceiptProvider,
BlockProvider, ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError,
NodeSubscriber,
},
};
use alloy_primitives::B256;
Expand Down Expand Up @@ -433,8 +438,9 @@ mod tests {
}

#[async_trait]
impl ReceiptProvider for Node {
impl BlockProvider for Node {
async fn fetch_receipts(&self, _block_hash: B256) -> Result<Receipts, ManagedNodeError>;
async fn block_by_number(&self, _number: u64) -> Result<BlockInfo, ManagedNodeError>;
}

#[async_trait]
Expand Down Expand Up @@ -489,6 +495,13 @@ mod tests {
) -> Result<(), StorageError>;
}

impl LogStorageReader for Db {
fn get_block(&self, block_number: u64) -> Result<BlockInfo, StorageError>;
fn get_latest_block(&self) -> Result<BlockInfo, StorageError>;
fn get_log(&self,block_number: u64,log_index: u32) -> Result<Log, StorageError>;
fn get_logs(&self, block_number: u64) -> Result<Vec<Log>, StorageError>;
}

impl DerivationStorageWriter for Db {
fn save_derived_block(
&self,
Expand Down
Loading