diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 8cbb3ff5c4..11854578a6 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -67,36 +67,43 @@ where let mut i = 0; while i < self.queue.len() - 1 { - let frame = &self.queue[i]; + let prev_frame = &self.queue[i]; let next_frame = &self.queue[i + 1]; - - // If the frames are in the same channel, and the current is last, drop the next frame. - if frame.id == next_frame.id && frame.is_last { - self.queue.remove(i + 1); - continue; - } + let extends_channel = prev_frame.id == next_frame.id; // If the frames are in the same channel, and the frame numbers are not sequential, // drop the next frame. - if frame.id == next_frame.id && frame.number + 1 != next_frame.number { + if extends_channel && prev_frame.number + 1 != next_frame.number { self.queue.remove(i + 1); continue; } - // If the frames are in different channels, and the current channel is not last, walk - // back the channel and drop all prev frames. - if frame.id != next_frame.id && !frame.is_last && next_frame.number == 0 { - self.queue.remove(i); - i = i.saturating_sub(1); + // If the frames are in the same channel, and the previous is last, drop the next frame. + if extends_channel && prev_frame.is_last { + self.queue.remove(i + 1); continue; } // If the frames are in different channels, the next frame must be first. - if frame.id != next_frame.id && next_frame.number != 0 { + if !extends_channel && next_frame.number != 0 { self.queue.remove(i + 1); continue; } + // If the frames are in different channels, and the current channel is not last, walk + // back the channel and drop all prev frames. + if !extends_channel && !prev_frame.is_last && next_frame.number == 0 { + // Find the index of the first frame in the queue with the same channel ID + // as the previous frame. + let first_frame = + self.queue.iter().position(|f| f.id == prev_frame.id).expect("infallible"); + + // Drain all frames from the previous channel. + let drained = self.queue.drain(first_frame..=i); + i = i.saturating_sub(drained.len()); + continue; + } + i += 1; } } @@ -125,21 +132,15 @@ where return Ok(()); }; - // Frame parsing should return an error if no frames were parsed - // and the above branch should be hit. Either way, return early here - // so as to not re-apply the pruning rules against all frames. - if frames.is_empty() { - error!(target: "frame-queue", "Frames should not be empty after parsing."); - return Ok(()); - } - - crate::inc!(DERIVED_FRAMES_COUNT, frames.len() as f64, &["success"]); + // Optimistically extend the queue with the new frames. self.queue.extend(frames); // Prune frames if Holocene is active. let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; self.prune(origin); + crate::inc!(DERIVED_FRAMES_COUNT, self.queue.len() as f64, &["success"]); + Ok(()) } } @@ -476,6 +477,33 @@ pub(crate) mod tests { assert_eq!(err, PipelineError::Eof.temp()); } + #[tokio::test] + async fn test_holocene_replace_channel() { + let frames = vec![ + // -- First Channel - VALID & CLOSED -- + Frame { id: [0xDD; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xDD; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + // -- Second Channel - VALID & NOT CLOSED / DROPPED -- + Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + // -- Third Channel - VALID & CLOSED / REPLACES CHANNEL #2 -- + Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: true }, + ]; + let encoded = encode_frames(frames.clone()); + let config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]); + mock.set_origin(BlockInfo::default()); + let mut frame_queue = FrameQueue::new(mock, Arc::new(config)); + assert!(frame_queue.is_holocene_active(BlockInfo::default())); + for frame in frames.iter().filter(|f| f.id != [0xEE; 16]) { + let frame_decoded = frame_queue.next_frame().await.unwrap(); + assert_eq!(frame_decoded, *frame); + } + let err = frame_queue.next_frame().await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + } + #[tokio::test] async fn test_holocene_interleaved_invalid_channel() { let frames = vec![