Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ verify an [L2 output root][g-output-root] from the L1 inputs it was [derived fro

- [`common`](./crates/common): A suite of utilities for developing `client` programs to be ran on top of Fault Proof VMs.
- [`preimage`](./crates/preimage): High level interfaces to the [`PreimageOracle`][fpp-specs] ABI
- [`derive`](./crates/derive): `no_std` compatible implementation of the [derivation pipeline][g-derivation-pipeline].

## Book

Expand Down
1 change: 1 addition & 0 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ alloy-primitives = { version = "0.6.3", default-features = false, features = ["r
alloy-rlp = { version = "0.3.4", default-features = false, features = ["derive"] }
alloy-sol-types = { version = "0.6.3", default-features = false }
async-trait = "0.1.77"
hashbrown = "0.14.3"

# Optional
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/derive/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# `kona-derive`

> **Notice**: This crate is a WIP.

A `no_std` compatible implementation of the OP Stack's
[derivation pipeline](https://specs.optimism.io/protocol/derivation.html#l2-chain-derivation-specification).
5 changes: 4 additions & 1 deletion crates/derive/src/params.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module contains the parameters and identifying types for the derivation pipeline.

/// Count the tagging info as 200 in terms of buffer size.
pub const FRAME_OVERHEAD: u64 = 200;
pub const FRAME_OVERHEAD: usize = 200;

/// The version of the derivation pipeline.
pub const DERIVATION_VERSION_0: u8 = 0;
Expand All @@ -15,6 +15,9 @@ pub const MAX_SPAN_BATCH_BYTES: u64 = MAX_RLP_BYTES_PER_CHANNEL;
/// a channel. This limit is set when decoding the RLP.
pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000;

/// The maximum size of a channel bank.
pub const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000;

/// [CHANNEL_ID_LENGTH] is the length of the channel ID.
pub const CHANNEL_ID_LENGTH: usize = 16;

Expand Down
203 changes: 203 additions & 0 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
@@ -1 +1,204 @@
//! This module contains the `ChannelBank` struct.

use super::{frame_queue::FrameQueue, l1_retrieval::L1Retrieval};
use crate::{
params::{ChannelID, MAX_CHANNEL_BANK_SIZE},
traits::{ChainProvider, DataAvailabilityProvider, ResettableStage},
types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, collections::VecDeque};
use alloy_primitives::Bytes;
use anyhow::{anyhow, bail};
use async_trait::async_trait;
use hashbrown::HashMap;

/// [ChannelBank] is a stateful stage that does the following:
/// 1. Unmarshalls frames from L1 transaction data
/// 2. Applies those frames to a channel
/// 3. Attempts to read from the channel when it is ready
/// 4. Prunes channels (not frames) when the channel bank is too large.
///
/// Note: we prune before we ingest data.
/// As we switch between ingesting data & reading, the prune step occurs at an odd point
/// Specifically, the channel bank is not allowed to become too large between successive calls
/// to `IngestData`. This means that we can do an ingest and then do a read while becoming too large.
/// [ChannelBank] buffers channel frames, and emits full channel data
pub struct ChannelBank<DAP, CP>
where
DAP: DataAvailabilityProvider,
CP: ChainProvider,
{
/// The rollup configuration.
cfg: RollupConfig,
/// Map of channels by ID.
channels: HashMap<ChannelID, Channel>,
/// Channels in FIFO order.
channel_queue: VecDeque<ChannelID>,
/// The previous stage of the derivation pipeline.
prev: FrameQueue<DAP, CP>,
/// Chain provider.
chain_provider: CP,
}

impl<DAP, CP> ChannelBank<DAP, CP>
where
DAP: DataAvailabilityProvider,
CP: ChainProvider,
{
/// Create a new [ChannelBank] stage.
pub fn new(cfg: RollupConfig, prev: FrameQueue<DAP, CP>, chain_provider: CP) -> Self {
Self {
cfg,
channels: HashMap::new(),
channel_queue: VecDeque::new(),
prev,
chain_provider,
}
}

/// Returns the L1 origin [BlockInfo].
pub fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
}

/// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE].
pub fn prune(&mut self) -> StageResult<()> {
// Check total size
let mut total_size = self.channels.iter().fold(0, |acc, (_, c)| acc + c.size());
// Prune until it is reasonable again. The high-priority channel failed to be read,
// so we prune from there.
while total_size > MAX_CHANNEL_BANK_SIZE {
let id = self
.channel_queue
.pop_front()
.ok_or(anyhow!("No channel to prune"))?;
let channel = self
.channels
.remove(&id)
.ok_or(anyhow!("Could not find channel"))?;
total_size -= channel.size();
}
Ok(())
}

/// Adds new L1 data to the channel bank. Should only be called after all data has been read.
pub fn ingest_frame(&mut self, frame: Frame) -> StageResult<()> {
let origin = *self.origin().ok_or(anyhow!("No origin"))?;

let current_channel = self.channels.entry(frame.id).or_insert_with(|| {
// Create a new channel
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
channel
});

// Check if the channel is not timed out. If it has, ignore the frame.
if current_channel.open_block_number() + self.cfg.channel_timeout < origin.number {
return Ok(());
}

// Ingest the frame. If it fails, ignore the frame.
if current_channel.add_frame(frame, origin).is_err() {
return Ok(());
}

self.prune()
}

/// Read the raw data of the first channel, if it's timed-out or closed.
///
/// Returns an error if there is nothing new to read.
pub fn read(&mut self) -> StageResult<Option<Bytes>> {
// Bail if there are no channels to read from.
if self.channel_queue.is_empty() {
return Err(StageError::Eof);
}

// Return an `Ok(None)` if the first channel is timed out. There may be more timed
// out channels at the head of the queue and we want to remove them all.
let first = self.channel_queue[0];
let channel = self
.channels
.get(&first)
.ok_or(anyhow!("Channel not found"))?;
let origin = self.origin().ok_or(anyhow!("No origin present"))?;

if channel.open_block_number() + self.cfg.channel_timeout < origin.number {
self.channels.remove(&first);
self.channel_queue.pop_front();
return Ok(None);
}

// At the point we have removed all timed out channels from the front of the `channel_queue`.
// Pre-Canyon we simply check the first index.
// Post-Canyon we read the entire channelQueue for the first ready channel. If no channel is
// available, we return `nil, io.EOF`.
// Canyon is activated when the first L1 block whose time >= CanyonTime, not on the L2 timestamp.
if !self.cfg.is_canyon_active(origin.timestamp) {
return self.try_read_channel_at_index(0).map(Some);
}

let channel_data =
(0..self.channel_queue.len()).find_map(|i| self.try_read_channel_at_index(i).ok());
match channel_data {
Some(data) => Ok(Some(data)),
None => Err(StageError::Eof),
}
}

/// Pulls the next piece of data from the channel bank. Note that it attempts to pull data out of the channel bank prior to
/// loading data in (unlike most other stages). This is to ensure maintain consistency around channel bank pruning which depends upon the order
/// of operations.
pub async fn next_data(&mut self) -> StageResult<Option<Bytes>> {
match self.read() {
Err(StageError::Eof) => {
// continue - we will attempt to load data into the channel bank
}
Err(e) => {
return Err(anyhow!("Error fetching next data from channel bank: {:?}", e).into());
}
data => return data,
};

// Load the data into the channel bank
let frame = self.prev.next_frame().await?;
self.ingest_frame(frame);
Err(StageError::NotEnoughData)
}

/// Attempts to read the channel at the specified index. If the channel is not ready or timed out,
/// it will return an error.
/// If the channel read was successful, it will remove the channel from the channel queue.
fn try_read_channel_at_index(&mut self, index: usize) -> StageResult<Bytes> {
let channel_id = self.channel_queue[index];
let channel = self
.channels
.get(&channel_id)
.ok_or(anyhow!("Channel not found"))?;
let origin = self.origin().ok_or(anyhow!("No origin present"))?;

let timed_out = channel.open_block_number() + self.cfg.channel_timeout < origin.number;
if timed_out || !channel.is_ready() {
return Err(StageError::Eof);
}

let frame_data = channel.frame_data();
self.channels.remove(&channel_id);
self.channel_queue.remove(index);

frame_data.map_err(StageError::Custom)
}
}

#[async_trait]
impl<DAP, CP> ResettableStage for ChannelBank<DAP, CP>
where
DAP: DataAvailabilityProvider + Send,
CP: ChainProvider + Send,
{
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {
self.channels.clear();
self.channel_queue = VecDeque::with_capacity(10);
Err(StageError::Eof)
}
}
Loading