Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 51 additions & 23 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,43 @@ where

let mut i = 0;
while i < self.queue.len() - 1 {
let frame = &self.queue[i];
let prev_frame = &self.queue[i];
let next_frame = &self.queue[i + 1];

// If the frames are in the same channel, and the current is last, drop the next frame.
if frame.id == next_frame.id && frame.is_last {
self.queue.remove(i + 1);
continue;
}
let extends_channel = prev_frame.id == next_frame.id;

// If the frames are in the same channel, and the frame numbers are not sequential,
// drop the next frame.
if frame.id == next_frame.id && frame.number + 1 != next_frame.number {
if extends_channel && prev_frame.number + 1 != next_frame.number {
self.queue.remove(i + 1);
continue;
}

// If the frames are in different channels, and the current channel is not last, walk
// back the channel and drop all prev frames.
if frame.id != next_frame.id && !frame.is_last && next_frame.number == 0 {
self.queue.remove(i);
i = i.saturating_sub(1);
// If the frames are in the same channel, and the previous is last, drop the next frame.
if extends_channel && prev_frame.is_last {
self.queue.remove(i + 1);
continue;
}

// If the frames are in different channels, the next frame must be first.
if frame.id != next_frame.id && next_frame.number != 0 {
if !extends_channel && next_frame.number != 0 {
self.queue.remove(i + 1);
continue;
}

// If the frames are in different channels, and the current channel is not last, walk
// back the channel and drop all prev frames.
if !extends_channel && !prev_frame.is_last && next_frame.number == 0 {
// Find the index of the first frame in the queue with the same channel ID
// as the previous frame.
let first_frame =
self.queue.iter().position(|f| f.id == prev_frame.id).expect("infallible");

// Drain all frames from the previous channel.
let drained = self.queue.drain(first_frame..=i);
i = i.saturating_sub(drained.len());
continue;
}

i += 1;
}
}
Expand Down Expand Up @@ -125,21 +132,15 @@ where
return Ok(());
};

// Frame parsing should return an error if no frames were parsed
// and the above branch should be hit. Either way, return early here
// so as to not re-apply the pruning rules against all frames.
if frames.is_empty() {
error!(target: "frame-queue", "Frames should not be empty after parsing.");
return Ok(());
}

crate::inc!(DERIVED_FRAMES_COUNT, frames.len() as f64, &["success"]);
// Optimistically extend the queue with the new frames.
self.queue.extend(frames);

// Prune frames if Holocene is active.
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
self.prune(origin);

crate::inc!(DERIVED_FRAMES_COUNT, self.queue.len() as f64, &["success"]);

Ok(())
}
}
Expand Down Expand Up @@ -476,6 +477,33 @@ pub(crate) mod tests {
assert_eq!(err, PipelineError::Eof.temp());
}

#[tokio::test]
async fn test_holocene_replace_channel() {
let frames = vec![
// -- First Channel - VALID & CLOSED --
Frame { id: [0xDD; 16], number: 0, data: vec![0xDD; 50], is_last: false },
Frame { id: [0xDD; 16], number: 1, data: vec![0xDD; 50], is_last: true },
// -- Second Channel - VALID & NOT CLOSED / DROPPED --
Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false },
Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false },
// -- Third Channel - VALID & CLOSED / REPLACES CHANNEL #2 --
Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false },
Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: true },
];
let encoded = encode_frames(frames.clone());
let config = RollupConfig { holocene_time: Some(0), ..Default::default() };
let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]);
mock.set_origin(BlockInfo::default());
let mut frame_queue = FrameQueue::new(mock, Arc::new(config));
assert!(frame_queue.is_holocene_active(BlockInfo::default()));
for frame in frames.iter().filter(|f| f.id != [0xEE; 16]) {
let frame_decoded = frame_queue.next_frame().await.unwrap();
assert_eq!(frame_decoded, *frame);
}
let err = frame_queue.next_frame().await.unwrap_err();
assert_eq!(err, PipelineError::Eof.temp());
}

#[tokio::test]
async fn test_holocene_interleaved_invalid_channel() {
let frames = vec![
Expand Down