Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use alloy_primitives::Bytes;
use alloy_primitives::{hex, Bytes};
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;
Expand Down Expand Up @@ -91,7 +91,10 @@ where
if current_channel.open_block_number() + self.cfg.channel_timeout(origin.timestamp) <
origin.number
{
warn!(target: "channel-bank", "Channel {:?} timed out", frame.id);
warn!(
target: "channel-bank",
"Channel (ID: {}) timed out", hex::encode(frame.id)
);
return Ok(());
}

Expand Down Expand Up @@ -134,7 +137,10 @@ where
let origin = self.origin().ok_or(StageError::MissingOrigin)?;
if channel.open_block_number() + self.cfg.channel_timeout(origin.timestamp) < origin.number
{
warn!(target: "channel-bank", "Channel {:?} timed out", first);
warn!(
target: "channel-bank",
"Channel (ID: {}) timed out", hex::encode(first)
);
crate::observe!(CHANNEL_TIMEOUTS, (origin.number - channel.open_block_number()) as f64);
self.channels.remove(&first);
self.channel_queue.pop_front();
Expand Down Expand Up @@ -267,6 +273,7 @@ mod tests {
test_utils::{CollectingLayer, MockChannelBankProvider, TraceStorage},
};
use alloc::vec;
use kona_primitives::{BASE_MAINNET_CONFIG, OP_MAINNET_CONFIG};
use tracing::Level;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -341,4 +348,48 @@ mod tests {
let err = channel_bank.next_data().await.unwrap_err();
assert_eq!(err, StageError::NotEnoughData);
}

#[tokio::test]
async fn test_channel_timeout() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

const ROLLUP_CONFIGS: [RollupConfig; 2] = [OP_MAINNET_CONFIG, BASE_MAINNET_CONFIG];

for cfg in ROLLUP_CONFIGS {
let frames = new_test_frames(2);
let mock = MockChannelBankProvider::new(frames.into_iter().map(Ok).collect::<Vec<_>>());
let cfg = Arc::new(cfg);
let mut channel_bank = ChannelBank::new(cfg.clone(), mock);

// Ingest first frame
let err = channel_bank.next_data().await.unwrap_err();
assert_eq!(err, StageError::NotEnoughData);

for _ in 0..cfg.channel_timeout + 1 {
channel_bank.advance_origin().await.unwrap();
}

// There should be an in-progress channel.
assert_eq!(channel_bank.channels.len(), 1);
assert_eq!(channel_bank.channel_queue.len(), 1);

// Should be `Ok(())`, channel timed out.
channel_bank.next_data().await.unwrap();

// The channel should have been pruned.
assert_eq!(channel_bank.channels.len(), 0);
assert_eq!(channel_bank.channel_queue.len(), 0);

// Ensure the channel was successfully timed out.
let (_, warning_trace) = trace_store
.lock()
.iter()
.find(|(l, _)| matches!(l, &Level::WARN))
.cloned()
.unwrap();
assert!(warning_trace.contains("timed out"));
}
}
}
4 changes: 4 additions & 0 deletions crates/derive/src/stages/test_utils/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl OriginProvider for MockChannelBankProvider {
#[async_trait]
impl OriginAdvancer for MockChannelBankProvider {
async fn advance_origin(&mut self) -> StageResult<()> {
self.block_info = self.block_info.map(|mut bi| {
bi.number += 1;
bi
});
Ok(())
}
}
Expand Down