diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 03f188181d..5b5e22de0c 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -7,7 +7,7 @@ use crate::{ types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; -use alloy_primitives::Bytes; +use alloy_primitives::{hex, Bytes}; use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; @@ -91,7 +91,10 @@ where if current_channel.open_block_number() + self.cfg.channel_timeout(origin.timestamp) < origin.number { - warn!(target: "channel-bank", "Channel {:?} timed out", frame.id); + warn!( + target: "channel-bank", + "Channel (ID: {}) timed out", hex::encode(frame.id) + ); return Ok(()); } @@ -134,7 +137,10 @@ where let origin = self.origin().ok_or(StageError::MissingOrigin)?; if channel.open_block_number() + self.cfg.channel_timeout(origin.timestamp) < origin.number { - warn!(target: "channel-bank", "Channel {:?} timed out", first); + warn!( + target: "channel-bank", + "Channel (ID: {}) timed out", hex::encode(first) + ); crate::observe!(CHANNEL_TIMEOUTS, (origin.number - channel.open_block_number()) as f64); self.channels.remove(&first); self.channel_queue.pop_front(); @@ -267,6 +273,7 @@ mod tests { test_utils::{CollectingLayer, MockChannelBankProvider, TraceStorage}, }; use alloc::vec; + use kona_primitives::{BASE_MAINNET_CONFIG, OP_MAINNET_CONFIG}; use tracing::Level; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -341,4 +348,48 @@ mod tests { let err = channel_bank.next_data().await.unwrap_err(); assert_eq!(err, StageError::NotEnoughData); } + + #[tokio::test] + async fn test_channel_timeout() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + + const ROLLUP_CONFIGS: [RollupConfig; 2] = [OP_MAINNET_CONFIG, BASE_MAINNET_CONFIG]; + + for cfg in ROLLUP_CONFIGS { + let frames = new_test_frames(2); + let mock = MockChannelBankProvider::new(frames.into_iter().map(Ok).collect::>()); + let cfg = Arc::new(cfg); + let mut channel_bank = ChannelBank::new(cfg.clone(), mock); + + // Ingest first frame + let err = channel_bank.next_data().await.unwrap_err(); + assert_eq!(err, StageError::NotEnoughData); + + for _ in 0..cfg.channel_timeout + 1 { + channel_bank.advance_origin().await.unwrap(); + } + + // There should be an in-progress channel. + assert_eq!(channel_bank.channels.len(), 1); + assert_eq!(channel_bank.channel_queue.len(), 1); + + // Should be `Ok(())`, channel timed out. + channel_bank.next_data().await.unwrap(); + + // The channel should have been pruned. + assert_eq!(channel_bank.channels.len(), 0); + assert_eq!(channel_bank.channel_queue.len(), 0); + + // Ensure the channel was successfully timed out. + let (_, warning_trace) = trace_store + .lock() + .iter() + .find(|(l, _)| matches!(l, &Level::WARN)) + .cloned() + .unwrap(); + assert!(warning_trace.contains("timed out")); + } + } } diff --git a/crates/derive/src/stages/test_utils/channel_bank.rs b/crates/derive/src/stages/test_utils/channel_bank.rs index 357ae9e769..bc7d511bd8 100644 --- a/crates/derive/src/stages/test_utils/channel_bank.rs +++ b/crates/derive/src/stages/test_utils/channel_bank.rs @@ -33,6 +33,10 @@ impl OriginProvider for MockChannelBankProvider { #[async_trait] impl OriginAdvancer for MockChannelBankProvider { async fn advance_origin(&mut self) -> StageResult<()> { + self.block_info = self.block_info.map(|mut bi| { + bi.number += 1; + bi + }); Ok(()) } }